Author: rhs
Date: Thu Jul 17 19:53:01 2014
New Revision: 1611460
URL: http://svn.apache.org/r1611460
Log:
removed deprecated transport functions from python binding; reworked transport
tests to check for better error semantics; added duplicate event removal to
proton-c; added flow on send event to proton-c
Modified:
qpid/proton/trunk/proton-c/bindings/python/cproton.i
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/transport.h
qpid/proton/trunk/proton-c/src/engine/event.c
qpid/proton/trunk/proton-c/src/transport/transport.c
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/resources/cengine.py
qpid/proton/trunk/tests/python/proton_tests/common.py
qpid/proton/trunk/tests/python/proton_tests/engine.py
qpid/proton/trunk/tests/python/proton_tests/sasl.py
qpid/proton/trunk/tests/python/proton_tests/transport.py
Modified: qpid/proton/trunk/proton-c/bindings/python/cproton.i
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/cproton.i?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/cproton.i (original)
+++ qpid/proton/trunk/proton-c/bindings/python/cproton.i Thu Jul 17 19:53:01
2014
@@ -134,24 +134,13 @@ ssize_t pn_link_send(pn_link_t *transpor
%}
%ignore pn_link_recv;
-int pn_transport_push(pn_transport_t *transport, char *STRING, size_t LENGTH);
+ssize_t pn_transport_push(pn_transport_t *transport, char *STRING, size_t
LENGTH);
%ignore pn_transport_push;
%rename(pn_transport_peek) wrap_pn_transport_peek;
%inline %{
int wrap_pn_transport_peek(pn_transport_t *transport, char *OUTPUT, size_t
*OUTPUT_SIZE) {
- return pn_transport_peek(transport, OUTPUT, *OUTPUT_SIZE);
- }
-%}
-%ignore pn_transport_peek;
-
-ssize_t pn_transport_input(pn_transport_t *transport, char *STRING, size_t
LENGTH);
-%ignore pn_transport_input;
-
-%rename(pn_transport_output) wrap_pn_transport_output;
-%inline %{
- int wrap_pn_transport_output(pn_transport_t *transport, char *OUTPUT, size_t
*OUTPUT_SIZE) {
- ssize_t sz = pn_transport_output(transport, OUTPUT, *OUTPUT_SIZE);
+ ssize_t sz = pn_transport_peek(transport, OUTPUT, *OUTPUT_SIZE);
if (sz >= 0) {
*OUTPUT_SIZE = sz;
} else {
@@ -160,7 +149,7 @@ ssize_t pn_transport_input(pn_transport_
return sz;
}
%}
-%ignore pn_transport_output;
+%ignore pn_transport_peek;
%rename(pn_delivery) wrap_pn_delivery;
%inline %{
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jul 17 19:53:01
2014
@@ -3023,7 +3023,9 @@ class Transport(object):
return self._check(c)
def push(self, bytes):
- self._check(pn_transport_push(self._trans, bytes))
+ n = self._check(pn_transport_push(self._trans, bytes))
+ if n != len(bytes):
+ raise OverflowError("unable to process all bytes")
def close_tail(self):
self._check(pn_transport_close_tail(self._trans))
@@ -3049,27 +3051,6 @@ class Transport(object):
def close_head(self):
self._check(pn_transport_close_head(self._trans))
- def output(self, size):
- p = self.pending()
- if p < 0:
- return None
- else:
- out = self.peek(min(size, p))
- self.pop(len(out))
- return out
-
- def input(self, bytes):
- if not bytes:
- self.close_tail()
- return None
- else:
- c = self.capacity()
- if (c < 0):
- return None
- trimmed = bytes[:c]
- self.push(trimmed)
- return len(trimmed)
-
# AMQP 1.0 max-frame-size
def _get_max_frame_size(self):
return pn_transport_get_max_frame(self._trans)
Modified: qpid/proton/trunk/proton-c/include/proton/transport.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/transport.h?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/transport.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/transport.h Thu Jul 17 19:53:01
2014
@@ -328,16 +328,17 @@ PN_EXTERN char *pn_transport_tail(pn_tra
*
* This is equivalent to copying @c size bytes afther the tail pointer
* and then calling ::pn_transport_process with an argument of @c
- * size. It is an error to call this with a @c size larger than the
- * capacity reported by ::pn_transport_capacity.
+ * size. Only some of the bytes will be copied if there is
+ * insufficienty capacity available. Use ::pn_transport_capacity to
+ * determine how much capacity the transport has.
*
* @param[in] transport the transport
* @param[in] src the start of the data to push into the transport
* @param[in] size the amount of data to push into the transport
*
- * @return 0 on success, or error code if < 0
+ * @return the number of bytes pushed on success, or error code if < 0
*/
-PN_EXTERN int pn_transport_push(pn_transport_t *transport, const char *src,
size_t size);
+PN_EXTERN ssize_t pn_transport_push(pn_transport_t *transport, const char
*src, size_t size);
/**
* Process input data following the tail pointer.
@@ -404,9 +405,9 @@ PN_EXTERN const char *pn_transport_head(
* @param[in] transport the transport
* @param[out] dst the destination buffer
* @param[in] size the capacity of the destination buffer
- * @return 0 on success, or error code if < 0
+ * @return number of bytes copied on success, or error code if < 0
*/
-PN_EXTERN int pn_transport_peek(pn_transport_t *transport, char *dst, size_t
size);
+PN_EXTERN ssize_t pn_transport_peek(pn_transport_t *transport, char *dst,
size_t size);
/**
* Removes @c size bytes of output from the pending output queue
Modified: qpid/proton/trunk/proton-c/src/engine/event.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/event.c?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/event.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/event.c Thu Jul 17 19:53:01 2014
@@ -108,6 +108,11 @@ pn_event_t *pn_collector_put(pn_collecto
return NULL;
}
+ pn_event_t *tail = collector->tail;
+ if (tail && tail->type == type && tail->context == context) {
+ return NULL;
+ }
+
pn_event_t *event;
if (collector->free_head) {
@@ -118,8 +123,6 @@ pn_event_t *pn_collector_put(pn_collecto
event = pn_event();
}
- pn_event_t *tail = collector->tail;
-
if (tail) {
tail->next = event;
collector->tail = event;
Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Thu Jul 17 19:53:01
2014
@@ -1425,6 +1425,8 @@ int pn_process_tpwork_sender(pn_transpor
link->queued--;
link->session->outgoing_deliveries--;
}
+
+ pn_collector_put(transport->connection->collector, PN_LINK_FLOW, link);
}
}
@@ -1985,7 +1987,7 @@ char *pn_transport_tail(pn_transport_t *
return NULL;
}
-int pn_transport_push(pn_transport_t *transport, const char *src, size_t size)
+ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t
size)
{
assert(transport);
@@ -1993,14 +1995,19 @@ int pn_transport_push(pn_transport_t *tr
if (capacity < 0) {
return capacity;
} else if (size > (size_t) capacity) {
- return PN_OVERFLOW;
+ size = capacity;
}
char *dst = pn_transport_tail(transport);
assert(dst);
memmove(dst, src, size);
- return pn_transport_process(transport, size);
+ int n = pn_transport_process(transport, size);
+ if (n < 0) {
+ return n;
+ } else {
+ return size;
+ }
}
int pn_transport_process(pn_transport_t *transport, size_t size)
@@ -2045,7 +2052,7 @@ const char *pn_transport_head(pn_transpo
return NULL;
}
-int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size)
+ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, size_t size)
{
assert(transport);
@@ -2053,7 +2060,7 @@ int pn_transport_peek(pn_transport_t *tr
if (pending < 0) {
return pending;
} else if (size > (size_t) pending) {
- return PN_UNDERFLOW;
+ size = pending;
}
if (pending > 0) {
@@ -2062,7 +2069,7 @@ int pn_transport_peek(pn_transport_t *tr
memmove(dst, src, size);
}
- return 0;
+ return size;
}
void pn_transport_pop(pn_transport_t *transport, size_t size)
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Thu Jul 17 19:53:01 2014
@@ -108,6 +108,7 @@ public class TransportImpl extends Endpo
private FrameHandler _frameHandler = this;
private boolean _head_closed = false;
+ private TransportException _tail_error = null;
/**
* @deprecated This constructor's visibility will be reduced to the
default scope in a future release.
@@ -728,15 +729,22 @@ public class TransportImpl extends Endpo
private void processOpen()
{
- if(_connectionEndpoint != null && _connectionEndpoint.getLocalState()
!= EndpointState.UNINITIALIZED && !_isOpenSent)
- {
+ if ((_tail_error != null ||
+ (_connectionEndpoint != null &&
+ _connectionEndpoint.getLocalState() !=
EndpointState.UNINITIALIZED)) &&
+ !_isOpenSent) {
Open open = new Open();
- String cid = _connectionEndpoint.getLocalContainerId();
- open.setContainerId(cid == null ? "" : cid);
- open.setHostname(_connectionEndpoint.getHostname());
-
open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
-
open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
- open.setProperties(_connectionEndpoint.getProperties());
+ if (_connectionEndpoint != null) {
+ String cid = _connectionEndpoint.getLocalContainerId();
+ open.setContainerId(cid == null ? "" : cid);
+ open.setHostname(_connectionEndpoint.getHostname());
+
open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
+
open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
+ open.setProperties(_connectionEndpoint.getProperties());
+ } else {
+ open.setContainerId("");
+ }
+
if (_maxFrameSize > 0) {
open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize));
}
@@ -747,7 +755,6 @@ public class TransportImpl extends Endpo
_isOpenSent = true;
writeFrame(0, open, null, null);
-
}
}
@@ -872,6 +879,9 @@ public class TransportImpl extends Endpo
private boolean hasSendableMessages(SessionImpl session)
{
+ if (_connectionEndpoint == null) {
+ return false;
+ }
if(!_closeReceived && (session == null ||
!session.getTransportSession().endReceived()))
{
@@ -896,14 +906,24 @@ public class TransportImpl extends Endpo
private void processClose()
{
- if(_connectionEndpoint != null && _connectionEndpoint.getLocalState()
== EndpointState.CLOSED && !_isCloseSent)
- {
+ if ((_tail_error != null ||
+ (_connectionEndpoint != null &&
+ _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) &&
+ !_isCloseSent) {
if(!hasSendableMessages(null))
{
Close close = new Close();
- ErrorCondition localError = _connectionEndpoint.getCondition();
- if( localError.getCondition() !=null )
+ ErrorCondition localError;
+
+ if (_connectionEndpoint == null) {
+ localError = new
ErrorCondition(ConnectionError.FRAMING_ERROR,
+ _tail_error.toString());
+ } else {
+ localError = _connectionEndpoint.getCondition();
+ }
+
+ if(localError.getCondition() != null)
{
close.setError(localError);
}
@@ -911,7 +931,10 @@ public class TransportImpl extends Endpo
_isCloseSent = true;
writeFrame(0, close, null, null);
- _connectionEndpoint.clearModified();
+
+ if (_connectionEndpoint != null) {
+ _connectionEndpoint.clearModified();
+ }
}
}
}
@@ -1197,11 +1220,11 @@ public class TransportImpl extends Endpo
public void closed(TransportException error)
{
if (!_closeReceived || error != null) {
- Close close = new Close();
- String msg = error == null ? "connection aborted" :
error.toString();
- close.setError(new ErrorCondition(ConnectionError.FRAMING_ERROR,
msg));
- _isCloseSent = true;
- writeFrame(0, close, null, null);
+ if (error == null) {
+ _tail_error = new TransportException("connection aborted");
+ } else {
+ _tail_error = error;
+ }
_head_closed = true;
}
}
Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Thu Jul 17
19:53:01 2014
@@ -98,7 +98,11 @@ class pn_condition:
self.description = None
self.info.clear()
else:
- self.name = impl.getCondition().toString()
+ cond = impl.getCondition()
+ if cond is None:
+ self.name = None
+ else:
+ self.name = cond.toString()
self.description = impl.getDescription()
obj2dat(impl.getInfo(), self.info)
@@ -909,6 +913,7 @@ def pn_transport_peek(trans, size):
if size:
bb = trans.impl.head()
bb.get(ba)
+ bb.position(0)
return 0, ba.tostring()
def pn_transport_pop(trans, size):
@@ -922,16 +927,16 @@ def pn_transport_push(trans, input):
if cap < 0:
return cap
elif len(input) > cap:
- return PN_OVERFLOW
- else:
- bb = trans.impl.tail()
- bb.put(array(input, 'b'))
- try:
- trans.impl.process()
- return 0
- except TransportException, e:
- trans.error = pn_error(PN_ERR, str(e))
- return PN_ERR
+ input = input[:cap]
+
+ bb = trans.impl.tail()
+ bb.put(array(input, 'b'))
+ try:
+ trans.impl.process()
+ return len(input)
+ except TransportException, e:
+ trans.error = pn_error(PN_ERR, str(e))
+ return PN_ERR
def pn_transport_close_head(trans):
try:
Modified: qpid/proton/trunk/tests/python/proton_tests/common.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/common.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/common.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/common.py Thu Jul 17 19:53:01
2014
@@ -46,42 +46,36 @@ def free_tcp_ports(count=1):
s.close()
return ports
+def pump_uni(src, dst, buffer_size=1024):
+ p = src.pending()
+ c = dst.capacity()
+
+ if p < 0 and c < 0:
+ return False
+
+ if p < 0:
+ dst.close_tail()
+ elif p == 0 or c == 0:
+ return False
+ else:
+ if c < 0:
+ src.close_head()
+ else:
+ bytes = src.peek(min(c, buffer_size))
+ dst.push(bytes)
+ src.pop(len(bytes))
+
+ return True
def pump(transport1, transport2, buffer_size=1024):
""" Transfer all pending bytes between two Proton engines
- by repeatedly calling input and output.
+ by repeatedly calling peek/pop and push.
Asserts that each engine accepts some bytes every time
(unless it's already closed).
"""
-
- out1_leftover_by_t2 = ""
- out2_leftover_by_t1 = ""
- i = 0
-
- while True:
- out1 = out1_leftover_by_t2 + (transport1.output(buffer_size) or "")
- out2 = out2_leftover_by_t1 + (transport2.output(buffer_size) or "")
-
- if out1:
- number_t2_consumed = transport2.input(out1)
- if number_t2_consumed is None:
- # special None return value means input is closed so discard the
leftovers
- out1_leftover_by_t2 = ""
- else:
- assert number_t2_consumed > 0, (number_t2_consumed, len(out1),
out1[:100])
- out1_leftover_by_t2 = out1[number_t2_consumed:]
-
- if out2:
- number_t1_consumed = transport1.input(out2)
- if number_t1_consumed is None:
- # special None return value means input is closed so discard the
leftovers
- out2_leftover_by_t1 = ""
- else:
- assert number_t1_consumed > 0, (number_t1_consumed, len(out1),
out1[:100])
- out2_leftover_by_t1 = out2[number_t1_consumed:]
-
- if not out1 and not out2: break
- i = i + 1
+ while (pump_uni(transport1, transport2, buffer_size) or
+ pump_uni(transport2, transport1, buffer_size)):
+ pass
def isSSLPresent():
""" True if a suitable SSL library is available.
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jul 17 19:53:01
2014
@@ -2206,8 +2206,7 @@ class EventTest(Test):
rcv.flow(10)
self.pump()
self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT,
- Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT,
- Event.TRANSPORT)
+ Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT)
snd.delivery("delivery")
snd.send("Hello World!")
snd.advance()
@@ -2218,11 +2217,11 @@ class EventTest(Test):
self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY)
rcv.session.connection._transport.unbind()
rcv.session.connection.free()
- self.expect_oneof((Event.TRANSPORT, Event.TRANSPORT, Event.TRANSPORT,
Event.LINK_CLOSE,
- Event.SESSION_CLOSE, Event.CONNECTION_CLOSE,
Event.LINK_FINAL,
+ self.expect_oneof((Event.TRANSPORT, Event.LINK_CLOSE, Event.SESSION_CLOSE,
+ Event.CONNECTION_CLOSE, Event.LINK_FINAL,
Event.SESSION_FINAL, Event.CONNECTION_FINAL),
- (Event.TRANSPORT, Event.TRANSPORT, Event.TRANSPORT,
Event.LINK_CLOSE,
- Event.LINK_FINAL, Event.SESSION_CLOSE,
Event.SESSION_FINAL,
+ (Event.TRANSPORT, Event.LINK_CLOSE, Event.LINK_FINAL,
+ Event.SESSION_CLOSE, Event.SESSION_FINAL,
Event.CONNECTION_CLOSE, Event.CONNECTION_FINAL))
def testDeliveryEventsDisp(self):
@@ -2231,12 +2230,9 @@ class EventTest(Test):
dlv = snd.delivery("delivery")
snd.send("Hello World!")
assert snd.advance()
- self.expect(Event.LINK_OPEN,
- Event.TRANSPORT,
- Event.TRANSPORT,
- Event.TRANSPORT)
+ self.expect(Event.LINK_OPEN, Event.TRANSPORT)
self.pump()
- self.expect()
+ self.expect(Event.LINK_FLOW)
rdlv = rcv.current
assert rdlv != None
assert rdlv.tag == "delivery"
Modified: qpid/proton/trunk/tests/python/proton_tests/sasl.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/sasl.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/sasl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/sasl.py Thu Jul 17 19:53:01 2014
@@ -45,16 +45,16 @@ class SaslTest(Test):
self.s2.server()
self.s2.done(SASL.OK)
- out1 = self.t1.output(1024)
- out2 = self.t2.output(1024)
+ out1 = self.t1.peek(1024)
+ self.t1.pop(len(out1))
+ out2 = self.t2.peek(1024)
+ self.t2.pop(len(out2))
- n = self.t2.input(out1)
- assert n == len(out1), (n, out1)
+ self.t2.push(out1)
assert self.s1.outcome is None
- n = self.t1.input(out2)
- assert n == len(out2), (n, out2)
+ self.t1.push(out2)
assert self.s2.outcome == SASL.OK
@@ -67,8 +67,9 @@ class SaslTest(Test):
self.s2.done(SASL.OK)
# send the server's OK to the client
- out2 = self.t2.output(1024)
- self.t1.input(out2)
+ out2 = self.t2.peek(1024)
+ self.t2.pop(len(out2))
+ self.t1.push(out2)
# do some work to generate AMQP data
c1 = Connection()
@@ -84,15 +85,17 @@ class SaslTest(Test):
out1_sasl_and_amqp = ""
t1_still_producing = True
while t1_still_producing:
- out1 = self.t1.output(1024)
+ out1 = self.t1.peek(1024)
+ self.t1.pop(len(out1))
out1_sasl_and_amqp += out1
t1_still_producing = out1
t2_still_consuming = True
while t2_still_consuming:
- num_consumed = self.t2.input(out1_sasl_and_amqp)
- out1_sasl_and_amqp = out1_sasl_and_amqp[num_consumed:]
- t2_still_consuming = num_consumed > 0 and len(out1_sasl_and_amqp) > 0
+ num = min(self.t2.capacity(), len(out1_sasl_and_amqp))
+ self.t2.push(out1_sasl_and_amqp[:num])
+ out1_sasl_and_amqp = out1_sasl_and_amqp[num:]
+ t2_still_consuming = num > 0 and len(out1_sasl_and_amqp) > 0
assert len(out1_sasl_and_amqp) == 0, (len(out1_sasl_and_amqp),
out1_sasl_and_amqp)
@@ -129,9 +132,9 @@ class SaslTest(Test):
self.s1.mechanisms("ANONYMOUS")
self.s1.client()
- out1 = self.t1.output(1024)
- n = self.t2.input(out1)
- assert n == len(out1)
+ out1 = self.t1.peek(1024)
+ self.t1.pop(len(out1))
+ self.t2.push(out1)
self.s2.mechanisms("ANONYMOUS")
self.s2.server()
@@ -140,11 +143,11 @@ class SaslTest(Test):
c2.open()
self.t2.bind(c2)
- out2 = self.t2.output(1024)
- n = self.t1.input(out2)
- assert n == len(out2)
+ out2 = self.t2.peek(1024)
+ self.t2.pop(len(out2))
+ self.t1.push(out2)
- out1 = self.t1.output(1024)
+ out1 = self.t1.peek(1024)
assert len(out1) > 0
def testFracturedSASL(self):
@@ -156,17 +159,23 @@ class SaslTest(Test):
# self.t1.trace(Transport.TRACE_FRM)
- out = self.t1.output(1024)
- self.t1.input("AMQP\x03\x01\x00\x00")
- out = self.t1.output(1024)
- self.t1.input("\x00\x00\x00")
- out = self.t1.output(1024)
-
self.t1.input("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM")
- out = self.t1.output(1024)
- self.t1.input("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
- out = self.t1.output(1024)
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+ self.t1.push("AMQP\x03\x01\x00\x00")
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+ self.t1.push("\x00\x00\x00")
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+
self.t1.push("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM")
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+ self.t1.push("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
while out:
- out = self.t1.output(1024)
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
assert self.s1.outcome == SASL.OK, self.s1.outcome
Modified: qpid/proton/trunk/tests/python/proton_tests/transport.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/transport.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/transport.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/transport.py Thu Jul 17
19:53:01 2014
@@ -27,37 +27,53 @@ class TransportTest(Test):
def setup(self):
self.transport = Transport()
+ self.peer = Transport()
+ self.conn = Connection()
+ self.peer.bind(self.conn)
def teardown(self):
self.transport = None
+ self.peer = None
+ self.conn = None
+
+ def drain(self):
+ while True:
+ p = self.transport.pending()
+ if p < 0:
+ return
+ elif p > 0:
+ bytes = self.transport.peek(p)
+ self.peer.push(bytes)
+ self.transport.pop(len(bytes))
+ else:
+ assert False
+
+ def assert_error(self, name):
+ assert self.conn.remote_container is None, self.conn.remote_container
+ self.drain()
+ # verify that we received an open frame
+ assert self.conn.remote_container is not None, self.conn.remote_container
+ # verify that we received a close frame
+ assert self.conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_CLOSED,
self.conn.state
+ # verify that a framing error was reported
+ assert self.conn.remote_condition.name == name, self.conn.remote_condition
def testEOS(self):
- try:
- n = self.transport.input("")
- assert False, n
- except TransportException:
- pass
+ self.transport.push("") # should be a noop
+ self.transport.close_tail() # should result in framing error
+ self.assert_error(u'amqp:connection:framing-error')
def testPartial(self):
- n = self.transport.input("AMQ")
- assert n == 3, n
- try:
- n = self.transport.input("")
- assert False, n
- except TransportException:
- pass
+ self.transport.push("AMQ") # partial header
+ self.transport.close_tail() # should result in framing error
+ self.assert_error(u'amqp:connection:framing-error')
def testGarbage(self, garbage="GARBAGE_"):
- try:
- n = self.transport.input(garbage)
- assert False, n
- except TransportException, e:
- assert "AMQP header mismatch" in str(e), str(e)
- try:
- n = self.transport.input("")
- assert False, n
- except TransportException, e:
- pass
+ self.transport.push(garbage)
+ self.assert_error(u'amqp:connection:framing-error')
+ assert self.transport.pending() < 0
+ self.transport.close_tail()
+ assert self.transport.pending() < 0
def testSmallGarbage(self):
self.testGarbage("XXX")
@@ -66,16 +82,12 @@ class TransportTest(Test):
self.testGarbage("GARBAGE_XXX")
def testHeader(self):
- n = self.transport.input("AMQP\x00\x01\x00\x00")
- assert n == 8, n
- try:
- n = self.transport.input("")
- assert False, n
- except TransportException, e:
- assert "connection aborted" in str(e)
+ self.transport.push("AMQP\x00\x01\x00\x00")
+ self.transport.close_tail()
+ self.assert_error(u'amqp:connection:framing-error')
- def testOutput(self):
- out = self.transport.output(1024)
+ def testPeek(self):
+ out = self.transport.peek(1024)
assert out is not None
def testBindAfterOpen(self):
@@ -87,16 +99,10 @@ class TransportTest(Test):
conn.hostname = "test-hostname"
trn = Transport()
trn.bind(conn)
- out = trn.output(1024)
+ out = trn.peek(1024)
assert "test-container" in out, repr(out)
assert "test-hostname" in out, repr(out)
- n = self.transport.input(out)
- assert n > 0, n
- out = out[n:]
-
- if out:
- n = self.transport.input(out)
- assert n == 0
+ self.transport.push(out)
c = Connection()
assert c.remote_container == None
@@ -105,10 +111,6 @@ class TransportTest(Test):
self.transport.bind(c)
assert c.remote_container == "test-container"
assert c.remote_hostname == "test-hostname"
- if out:
- assert c.session_head(0) == None
- n = self.transport.input(out)
- assert n == len(out), (n, out)
assert c.session_head(0) != None
def testCloseHead(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]