Author: rgodfrey
Date: Fri Oct 19 20:59:40 2012
New Revision: 1400284

URL: http://svn.apache.org/viewvc?rev=1400284&view=rev
Log:
QPID-4381 : add heartbeating to the AMQP 1.0 java client

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java?rev=1400284&r1=1400283&r2=1400284&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
 Fri Oct 19 20:59:40 2012
@@ -77,13 +77,21 @@ public class FrameWriter implements Valu
             {
                 case SIZE_0:
 
-                    _typeWriter.setValue(_frame.getFrameBody());
-
                     int payloadLength = _payload == null ? 0 : 
_payload.remaining();
 
-                    _size = _typeWriter.writeToBuffer(remaining > 8
-                                                      ? 
(ByteBuffer)buffer.duplicate().position(buffer.position()+8)
-                                                      : 
ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+                    if(_typeWriter!=null)
+                    {
+                        _typeWriter.setValue(_frame.getFrameBody());
+
+
+                        _size = _typeWriter.writeToBuffer(remaining > 8
+                                                          ? 
(ByteBuffer)buffer.duplicate().position(buffer.position()+8)
+                                                          : 
ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength;
+                    }
+                    else
+                    {
+                        _size = 8 + payloadLength;
+                    }
                     if(remaining >= 4)
                     {
                         buffer.putInt(_size);
@@ -239,7 +247,14 @@ public class FrameWriter implements Valu
         _size = -1;
         _payload = null;
         final Object frameBody = frame.getFrameBody();
-        _typeWriter = _registry.getValueWriter(frameBody);
+        if(frameBody!=null)
+        {
+            _typeWriter = _registry.getValueWriter(frameBody);
+        }
+        else
+        {
+            _typeWriter = null;
+        }
         _payload = frame.getPayload() == null ? null : 
frame.getPayload().duplicate();
     }
 }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java?rev=1400284&r1=1400283&r2=1400284&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
 Fri Oct 19 20:59:40 2012
@@ -103,6 +103,7 @@ public class ConnectionHandler
 
         private boolean _setForClose;
         private boolean _closed;
+        private long _nextHeartbeat;
 
         public FrameOutput(final ConnectionEndpoint conn)
         {
@@ -165,14 +166,34 @@ public class ConnectionHandler
         {
             synchronized(_conn.getLock())
             {
+                long time = System.currentTimeMillis();
                 try
                 {
                     AMQFrame frame = null;
                     while(!closed() && (frame = _queue.poll()) == null && wait)
                     {
-                        _conn.getLock().wait();
+                        _conn.getLock().wait(_conn.getIdleTimeout()/2);
+
+                        if(_conn.getIdleTimeout()>0)
+                        {
+                            time = System.currentTimeMillis();
+
+                            if(frame == null && time > _nextHeartbeat)
+                            {
+                                frame = new TransportFrame((short) 0,null);
+                                break;
+                            }
+                        }
                     }
 
+
+
+
+                    if(frame != null)
+                    {
+                        _nextHeartbeat = time + _conn.getIdleTimeout()/2;
+
+                    }
                     if(frame == _endOfFrameMarker)
                     {
                         _closed = true;

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1400284&r1=1400283&r2=1400284&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
 Fri Oct 19 20:59:40 2012
@@ -81,6 +81,8 @@ public class ConnectionEndpoint implemen
     private boolean _closedForInput;
     private boolean _closedForOutput;
 
+    private long _idleTimeout;
+
     private AMQPDescribedTypeRegistry _describedTypeRegistry = 
AMQPDescribedTypeRegistry.newInstance()
                 .registerTransportLayer()
                 .registerMessagingLayer()
@@ -282,6 +284,11 @@ public class ConnectionEndpoint implemen
 
         _remoteContainerId = open.getContainerId();
 
+        if(open.getIdleTimeOut() != null)
+        {
+            _idleTimeout = open.getIdleTimeOut().longValue();
+        }
+
         switch(_state)
         {
             case UNOPENED:
@@ -316,6 +323,7 @@ public class ConnectionEndpoint implemen
                 sendClose(new Close());
                 break;
             case CLOSE_SENT:
+
             default:
         }
     }
@@ -650,6 +658,11 @@ public class ConnectionEndpoint implemen
         return this;
     }
 
+    public synchronized long getIdleTimeout()
+    {
+        return _idleTimeout;
+    }
+
     public synchronized void close()
     {
         switch(_state)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to