Author: rgodfrey
Date: Fri Oct 19 20:59:40 2012
New Revision: 1400284
URL: http://svn.apache.org/viewvc?rev=1400284&view=rev
Log:
QPID-4381 : add heartbeating to the AMQP 1.0 java client
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java?rev=1400284&r1=1400283&r2=1400284&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
Fri Oct 19 20:59:40 2012
@@ -77,13 +77,21 @@ public class FrameWriter implements Valu
{
case SIZE_0:
- _typeWriter.setValue(_frame.getFrameBody());
-
int payloadLength = _payload == null ? 0 :
_payload.remaining();
- _size = _typeWriter.writeToBuffer(remaining > 8
- ?
(ByteBuffer)buffer.duplicate().position(buffer.position()+8)
- :
ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+ if(_typeWriter!=null)
+ {
+ _typeWriter.setValue(_frame.getFrameBody());
+
+
+ _size = _typeWriter.writeToBuffer(remaining > 8
+ ?
(ByteBuffer)buffer.duplicate().position(buffer.position()+8)
+ :
ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+ }
+ else
+ {
+ _size = 8 + payloadLength;
+ }
if(remaining >= 4)
{
buffer.putInt(_size);
@@ -239,7 +247,14 @@ public class FrameWriter implements Valu
_size = -1;
_payload = null;
final Object frameBody = frame.getFrameBody();
- _typeWriter = _registry.getValueWriter(frameBody);
+ if(frameBody!=null)
+ {
+ _typeWriter = _registry.getValueWriter(frameBody);
+ }
+ else
+ {
+ _typeWriter = null;
+ }
_payload = frame.getPayload() == null ? null :
frame.getPayload().duplicate();
}
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1400284&r1=1400283&r2=1400284&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
Fri Oct 19 20:59:40 2012
@@ -103,6 +103,7 @@ public class ConnectionHandler
private boolean _setForClose;
private boolean _closed;
+ private long _nextHeartbeat;
public FrameOutput(final ConnectionEndpoint conn)
{
@@ -165,14 +166,34 @@ public class ConnectionHandler
{
synchronized(_conn.getLock())
{
+ long time = System.currentTimeMillis();
try
{
AMQFrame frame = null;
while(!closed() && (frame = _queue.poll()) == null && wait)
{
- _conn.getLock().wait();
+ _conn.getLock().wait(_conn.getIdleTimeout()/2);
+
+ if(_conn.getIdleTimeout()>0)
+ {
+ time = System.currentTimeMillis();
+
+ if(frame == null && time > _nextHeartbeat)
+ {
+ frame = new TransportFrame((short) 0,null);
+ break;
+ }
+ }
}
+
+
+
+ if(frame != null)
+ {
+ _nextHeartbeat = time + _conn.getIdleTimeout()/2;
+
+ }
if(frame == _endOfFrameMarker)
{
_closed = true;
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1400284&r1=1400283&r2=1400284&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
Fri Oct 19 20:59:40 2012
@@ -81,6 +81,8 @@ public class ConnectionEndpoint implemen
private boolean _closedForInput;
private boolean _closedForOutput;
+ private long _idleTimeout;
+
private AMQPDescribedTypeRegistry _describedTypeRegistry =
AMQPDescribedTypeRegistry.newInstance()
.registerTransportLayer()
.registerMessagingLayer()
@@ -282,6 +284,11 @@ public class ConnectionEndpoint implemen
_remoteContainerId = open.getContainerId();
+ if(open.getIdleTimeOut() != null)
+ {
+ _idleTimeout = open.getIdleTimeOut().longValue();
+ }
+
switch(_state)
{
case UNOPENED:
@@ -316,6 +323,7 @@ public class ConnectionEndpoint implemen
sendClose(new Close());
break;
case CLOSE_SENT:
+
default:
}
}
@@ -650,6 +658,11 @@ public class ConnectionEndpoint implemen
return this;
}
+ public synchronized long getIdleTimeout()
+ {
+ return _idleTimeout;
+ }
+
public synchronized void close()
{
switch(_state)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]