Author: rhs
Date: Fri Mar 7 18:05:18 2014
New Revision: 1575346
URL: http://svn.apache.org/r1575346
Log:
fixed transport stall on aborted connection
Modified:
qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/transport/transport.c
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
qpid/proton/trunk/proton-j/src/main/resources/cengine.py
qpid/proton/trunk/tests/python/proton_tests/transport.py
Modified:
qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
---
qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
(original)
+++
qpid/proton/trunk/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
Fri Mar 7 18:05:18 2014
@@ -273,7 +273,7 @@ public class SampleTest {
try {
_logger.fine("Waiting...");
Future<Void> disconnectedFuture =
conn.getDisconnectedFuture();
- disconnectedFuture.await(300, TimeUnit.SECONDS);
+ disconnectedFuture.await(10, TimeUnit.SECONDS);
_logger.fine("done");
assertEquals(expected, server.getMessagesReceived());
} catch (Exception e) {
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Mar 7 18:05:18
2014
@@ -171,6 +171,7 @@ struct pn_transport_t {
size_t input_pending;
char *input_buf;
bool tail_closed; // input stream closed by driver
+ bool head_closed;
void *context;
};
Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Fri Mar 7 18:05:18
2014
@@ -165,6 +165,7 @@ static void pn_transport_initialize(void
transport->close_sent = false;
transport->close_rcvd = false;
transport->tail_closed = false;
+ transport->head_closed = false;
transport->remote_container = NULL;
transport->remote_hostname = NULL;
transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE;
@@ -1064,7 +1065,8 @@ static ssize_t pn_input_read_header(pn_t
char quoted[1024];
pn_quote_data(quoted, 1024, bytes, available);
return pn_error_format(transport->error, PN_ERR,
- "%s header mismatch: '%s'", protocol, quoted);
+ "%s header mismatch: '%s'%s", protocol, quoted,
+ available ? "" : " (connection aborted)");
} else {
transport->header_count += delta;
if (transport->header_count == size) {
@@ -2062,7 +2064,8 @@ int pn_transport_close_tail(pn_transport
// output
ssize_t pn_transport_pending(pn_transport_t *transport) /* <0 == done */
{
- if (!transport) return PN_ARG_ERR;
+ assert(transport);
+ if (transport->head_closed) return PN_EOS;
return transport_produce( transport );
}
@@ -2109,7 +2112,12 @@ void pn_transport_pop(pn_transport_t *tr
int pn_transport_close_head(pn_transport_t *transport)
{
- return 0;
+ transport->head_closed = true;
+ if (transport->close_sent && transport->output_pending == 0) {
+ return 0;
+ } else {
+ return pn_error_set(transport->error, PN_ERR, "connection aborted");
+ }
}
// true if the transport will not generate further output
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
Fri Mar 7 18:05:18 2014
@@ -30,6 +30,7 @@ class TransportOutputAdaptor implements
private final ByteBuffer _outputBuffer;
private final ByteBuffer _head;
+ private boolean _output_done = false;
private boolean _head_closed = false;
TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int
maxFrameSize)
@@ -47,10 +48,14 @@ class TransportOutputAdaptor implements
@Override
public int pending()
{
- _head_closed = _transportOutputWriter.writeInto(_outputBuffer);
+ if (_head_closed) {
+ return Transport.END_OF_STREAM;
+ }
+
+ _output_done = _transportOutputWriter.writeInto(_outputBuffer);
_head.limit(_outputBuffer.position());
- if (_head_closed && _outputBuffer.position() == 0) {
+ if (_output_done && _outputBuffer.position() == 0) {
return Transport.END_OF_STREAM;
} else {
return _outputBuffer.position();
@@ -77,6 +82,7 @@ class TransportOutputAdaptor implements
@Override
public void close_head()
{
+ _head_closed = true;
_transportOutputWriter.closed();
}
Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Fri Mar 7
18:05:18 2014
@@ -917,6 +917,14 @@ def pn_transport_push(trans, input):
trans.error = pn_error(PN_ERR, str(e))
return PN_ERR
+def pn_transport_close_head(trans):
+ try:
+ trans.impl.close_head()
+ return 0
+ except TransportException, e:
+ trans.error = pn_error(PN_ERR, str(e))
+ return PN_ERR
+
def pn_transport_close_tail(trans):
try:
trans.impl.close_tail()
Modified: qpid/proton/trunk/tests/python/proton_tests/transport.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/transport.py?rev=1575346&r1=1575345&r2=1575346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/transport.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/transport.py Fri Mar 7
18:05:18 2014
@@ -110,3 +110,23 @@ class TransportTest(Test):
n = self.transport.input(out)
assert n == len(out), (n, out)
assert c.session_head(0) != None
+
+ def testCloseHead(self):
+ n = self.transport.pending()
+ assert n > 0, n
+ try:
+ self.transport.close_head()
+ except TransportException, e:
+ assert "aborted" in str(e), str(e)
+ n = self.transport.pending()
+ assert n < 0, n
+
+ def testCloseTail(self):
+ n = self.transport.capacity()
+ assert n > 0, n
+ try:
+ self.transport.close_tail()
+ except TransportException, e:
+ assert "aborted" in str(e), str(e)
+ n = self.transport.capacity()
+ assert n < 0, n
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]