Author: rhs
Date: Fri Mar  7 18:05:18 2014
New Revision: 1575346

URL: http://svn.apache.org/r1575346
Log:
fixed transport stall on aborted connection

Modified:
    
qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/transport/transport.c
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
    qpid/proton/trunk/proton-j/src/main/resources/cengine.py
    qpid/proton/trunk/tests/python/proton_tests/transport.py

Modified: 
qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- 
qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
 (original)
+++ 
qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
 Fri Mar  7 18:05:18 2014
@@ -273,7 +273,7 @@ public class SampleTest {
                try {
                        _logger.fine("Waiting...");
                        Future<Void> disconnectedFuture = 
conn.getDisconnectedFuture();
-                       disconnectedFuture.await(300, TimeUnit.SECONDS);
+                       disconnectedFuture.await(10, TimeUnit.SECONDS);
                        _logger.fine("done");
                        assertEquals(expected, server.getMessagesReceived());
                } catch (Exception e) {

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Mar  7 18:05:18 
2014
@@ -171,6 +171,7 @@ struct pn_transport_t {
   size_t input_pending;
   char *input_buf;
   bool tail_closed;      // input stream closed by driver
+  bool head_closed;
 
   void *context;
 };

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Fri Mar  7 18:05:18 
2014
@@ -165,6 +165,7 @@ static void pn_transport_initialize(void
   transport->close_sent = false;
   transport->close_rcvd = false;
   transport->tail_closed = false;
+  transport->head_closed = false;
   transport->remote_container = NULL;
   transport->remote_hostname = NULL;
   transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE;
@@ -1064,7 +1065,8 @@ static ssize_t pn_input_read_header(pn_t
     char quoted[1024];
     pn_quote_data(quoted, 1024, bytes, available);
     return pn_error_format(transport->error, PN_ERR,
-                           "%s header mismatch: '%s'", protocol, quoted);
+                           "%s header mismatch: '%s'%s", protocol, quoted,
+                           available ? "" : " (connection aborted)");
   } else {
     transport->header_count += delta;
     if (transport->header_count == size) {
@@ -2062,7 +2064,8 @@ int pn_transport_close_tail(pn_transport
 // output
 ssize_t pn_transport_pending(pn_transport_t *transport)      /* <0 == done */
 {
-  if (!transport) return PN_ARG_ERR;
+  assert(transport);
+  if (transport->head_closed) return PN_EOS;
   return transport_produce( transport );
 }
 
@@ -2109,7 +2112,12 @@ void pn_transport_pop(pn_transport_t *tr
 
 int pn_transport_close_head(pn_transport_t *transport)
 {
-  return 0;
+  transport->head_closed = true;
+  if (transport->close_sent && transport->output_pending == 0) {
+    return 0;
+  } else {
+    return pn_error_set(transport->error, PN_ERR, "connection aborted");
+  }
 }
 
 // true if the transport will not generate further output

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
 Fri Mar  7 18:05:18 2014
@@ -30,6 +30,7 @@ class TransportOutputAdaptor implements 
 
     private final ByteBuffer _outputBuffer;
     private final ByteBuffer _head;
+    private boolean _output_done = false;
     private boolean _head_closed = false;
 
     TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int 
maxFrameSize)
@@ -47,10 +48,14 @@ class TransportOutputAdaptor implements 
     @Override
     public int pending()
     {
-        _head_closed = _transportOutputWriter.writeInto(_outputBuffer);
+        if (_head_closed) {
+            return Transport.END_OF_STREAM;
+        }
+
+        _output_done = _transportOutputWriter.writeInto(_outputBuffer);
         _head.limit(_outputBuffer.position());
 
-        if (_head_closed && _outputBuffer.position() == 0) {
+        if (_output_done && _outputBuffer.position() == 0) {
             return Transport.END_OF_STREAM;
         } else {
             return _outputBuffer.position();
@@ -77,6 +82,7 @@ class TransportOutputAdaptor implements 
     @Override
     public void close_head()
     {
+        _head_closed = true;
         _transportOutputWriter.closed();
     }
 

Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Fri Mar  7 
18:05:18 2014
@@ -917,6 +917,14 @@ def pn_transport_push(trans, input):
       trans.error = pn_error(PN_ERR, str(e))
       return PN_ERR
 
+def pn_transport_close_head(trans):
+  try:
+    trans.impl.close_head()
+    return 0
+  except TransportException, e:
+    trans.error = pn_error(PN_ERR, str(e))
+    return PN_ERR
+
 def pn_transport_close_tail(trans):
   try:
     trans.impl.close_tail()

Modified: qpid/proton/trunk/tests/python/proton_tests/transport.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/transport.py?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/transport.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/transport.py Fri Mar  7 
18:05:18 2014
@@ -110,3 +110,23 @@ class TransportTest(Test):
       n = self.transport.input(out)
       assert n == len(out), (n, out)
     assert c.session_head(0) != None
+
+  def testCloseHead(self):
+    n = self.transport.pending()
+    assert n > 0, n
+    try:
+      self.transport.close_head()
+    except TransportException, e:
+      assert "aborted" in str(e), str(e)
+    n = self.transport.pending()
+    assert n < 0, n
+
+  def testCloseTail(self):
+    n = self.transport.capacity()
+    assert n > 0, n
+    try:
+      self.transport.close_tail()
+    except TransportException, e:
+      assert "aborted" in str(e), str(e)
+    n = self.transport.capacity()
+    assert n < 0, n



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to