Author: rhs
Date: Thu Jul 17 19:53:01 2014
New Revision: 1611460

URL: http://svn.apache.org/r1611460
Log:
removed deprecated transport functions from python binding; reworked transport 
tests to check for better error semantics; added duplicate event removal to 
proton-c; added flow on send event to proton-c

Modified:
    qpid/proton/trunk/proton-c/bindings/python/cproton.i
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/transport.h
    qpid/proton/trunk/proton-c/src/engine/event.c
    qpid/proton/trunk/proton-c/src/transport/transport.c
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/src/main/resources/cengine.py
    qpid/proton/trunk/tests/python/proton_tests/common.py
    qpid/proton/trunk/tests/python/proton_tests/engine.py
    qpid/proton/trunk/tests/python/proton_tests/sasl.py
    qpid/proton/trunk/tests/python/proton_tests/transport.py

Modified: qpid/proton/trunk/proton-c/bindings/python/cproton.i
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/cproton.i?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/cproton.i (original)
+++ qpid/proton/trunk/proton-c/bindings/python/cproton.i Thu Jul 17 19:53:01 
2014
@@ -134,24 +134,13 @@ ssize_t pn_link_send(pn_link_t *transpor
 %}
 %ignore pn_link_recv;
 
-int pn_transport_push(pn_transport_t *transport, char *STRING, size_t LENGTH);
+ssize_t pn_transport_push(pn_transport_t *transport, char *STRING, size_t 
LENGTH);
 %ignore pn_transport_push;
 
 %rename(pn_transport_peek) wrap_pn_transport_peek;
 %inline %{
   int wrap_pn_transport_peek(pn_transport_t *transport, char *OUTPUT, size_t 
*OUTPUT_SIZE) {
-    return pn_transport_peek(transport, OUTPUT, *OUTPUT_SIZE);
-  }
-%}
-%ignore pn_transport_peek;
-
-ssize_t pn_transport_input(pn_transport_t *transport, char *STRING, size_t 
LENGTH);
-%ignore pn_transport_input;
-
-%rename(pn_transport_output) wrap_pn_transport_output;
-%inline %{
-  int wrap_pn_transport_output(pn_transport_t *transport, char *OUTPUT, size_t 
*OUTPUT_SIZE) {
-    ssize_t sz = pn_transport_output(transport, OUTPUT, *OUTPUT_SIZE);
+    ssize_t sz = pn_transport_peek(transport, OUTPUT, *OUTPUT_SIZE);
     if (sz >= 0) {
       *OUTPUT_SIZE = sz;
     } else {
@@ -160,7 +149,7 @@ ssize_t pn_transport_input(pn_transport_
     return sz;
   }
 %}
-%ignore pn_transport_output;
+%ignore pn_transport_peek;
 
 %rename(pn_delivery) wrap_pn_delivery;
 %inline %{

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jul 17 19:53:01 
2014
@@ -3023,7 +3023,9 @@ class Transport(object):
       return self._check(c)
 
   def push(self, bytes):
-    self._check(pn_transport_push(self._trans, bytes))
+    n = self._check(pn_transport_push(self._trans, bytes))
+    if n != len(bytes):
+      raise OverflowError("unable to process all bytes")
 
   def close_tail(self):
     self._check(pn_transport_close_tail(self._trans))
@@ -3049,27 +3051,6 @@ class Transport(object):
   def close_head(self):
     self._check(pn_transport_close_head(self._trans))
 
-  def output(self, size):
-    p = self.pending()
-    if p < 0:
-      return None
-    else:
-      out = self.peek(min(size, p))
-      self.pop(len(out))
-      return out
-
-  def input(self, bytes):
-    if not bytes:
-      self.close_tail()
-      return None
-    else:
-      c = self.capacity()
-      if (c < 0):
-        return None
-      trimmed = bytes[:c]
-      self.push(trimmed)
-      return len(trimmed)
-
   # AMQP 1.0 max-frame-size
   def _get_max_frame_size(self):
     return pn_transport_get_max_frame(self._trans)

Modified: qpid/proton/trunk/proton-c/include/proton/transport.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/transport.h?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/transport.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/transport.h Thu Jul 17 19:53:01 
2014
@@ -328,16 +328,17 @@ PN_EXTERN char *pn_transport_tail(pn_tra
  *
  * This is equivalent to copying @c size bytes afther the tail pointer
  * and then calling ::pn_transport_process with an argument of @c
- * size. It is an error to call this with a @c size larger than the
- * capacity reported by ::pn_transport_capacity.
+ * size. Only some of the bytes will be copied if there is
+ * insufficienty capacity available. Use ::pn_transport_capacity to
+ * determine how much capacity the transport has.
  *
  * @param[in] transport the transport
  * @param[in] src the start of the data to push into the transport
  * @param[in] size the amount of data to push into the transport
  *
- * @return 0 on success, or error code if < 0
+ * @return the number of bytes pushed on success, or error code if < 0
  */
-PN_EXTERN int pn_transport_push(pn_transport_t *transport, const char *src, 
size_t size);
+PN_EXTERN ssize_t pn_transport_push(pn_transport_t *transport, const char 
*src, size_t size);
 
 /**
  * Process input data following the tail pointer.
@@ -404,9 +405,9 @@ PN_EXTERN const char *pn_transport_head(
  * @param[in] transport the transport
  * @param[out] dst the destination buffer
  * @param[in] size the capacity of the destination buffer
- * @return 0 on success, or error code if < 0
+ * @return number of bytes copied on success, or error code if < 0
  */
-PN_EXTERN int pn_transport_peek(pn_transport_t *transport, char *dst, size_t 
size);
+PN_EXTERN ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, 
size_t size);
 
 /**
  * Removes @c size bytes of output from the pending output queue

Modified: qpid/proton/trunk/proton-c/src/engine/event.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/event.c?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/event.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/event.c Thu Jul 17 19:53:01 2014
@@ -108,6 +108,11 @@ pn_event_t *pn_collector_put(pn_collecto
     return NULL;
   }
 
+  pn_event_t *tail = collector->tail;
+  if (tail && tail->type == type && tail->context == context) {
+    return NULL;
+  }
+
   pn_event_t *event;
 
   if (collector->free_head) {
@@ -118,8 +123,6 @@ pn_event_t *pn_collector_put(pn_collecto
     event = pn_event();
   }
 
-  pn_event_t *tail = collector->tail;
-
   if (tail) {
     tail->next = event;
     collector->tail = event;

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Thu Jul 17 19:53:01 
2014
@@ -1425,6 +1425,8 @@ int pn_process_tpwork_sender(pn_transpor
         link->queued--;
         link->session->outgoing_deliveries--;
       }
+
+      pn_collector_put(transport->connection->collector, PN_LINK_FLOW, link);
     }
   }
 
@@ -1985,7 +1987,7 @@ char *pn_transport_tail(pn_transport_t *
   return NULL;
 }
 
-int pn_transport_push(pn_transport_t *transport, const char *src, size_t size)
+ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t 
size)
 {
   assert(transport);
 
@@ -1993,14 +1995,19 @@ int pn_transport_push(pn_transport_t *tr
   if (capacity < 0) {
     return capacity;
   } else if (size > (size_t) capacity) {
-    return PN_OVERFLOW;
+    size = capacity;
   }
 
   char *dst = pn_transport_tail(transport);
   assert(dst);
   memmove(dst, src, size);
 
-  return pn_transport_process(transport, size);
+  int n = pn_transport_process(transport, size);
+  if (n < 0) {
+    return n;
+  } else {
+    return size;
+  }
 }
 
 int pn_transport_process(pn_transport_t *transport, size_t size)
@@ -2045,7 +2052,7 @@ const char *pn_transport_head(pn_transpo
   return NULL;
 }
 
-int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size)
+ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, size_t size)
 {
   assert(transport);
 
@@ -2053,7 +2060,7 @@ int pn_transport_peek(pn_transport_t *tr
   if (pending < 0) {
     return pending;
   } else if (size > (size_t) pending) {
-    return PN_UNDERFLOW;
+    size = pending;
   }
 
   if (pending > 0) {
@@ -2062,7 +2069,7 @@ int pn_transport_peek(pn_transport_t *tr
     memmove(dst, src, size);
   }
 
-  return 0;
+  return size;
 }
 
 void pn_transport_pop(pn_transport_t *transport, size_t size)

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
 Thu Jul 17 19:53:01 2014
@@ -108,6 +108,7 @@ public class TransportImpl extends Endpo
 
     private FrameHandler _frameHandler = this;
     private boolean _head_closed = false;
+    private TransportException _tail_error = null;
 
     /**
      * @deprecated This constructor's visibility will be reduced to the 
default scope in a future release.
@@ -728,15 +729,22 @@ public class TransportImpl extends Endpo
 
     private void processOpen()
     {
-        if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() 
!= EndpointState.UNINITIALIZED && !_isOpenSent)
-        {
+        if ((_tail_error != null ||
+             (_connectionEndpoint != null &&
+              _connectionEndpoint.getLocalState() != 
EndpointState.UNINITIALIZED)) &&
+            !_isOpenSent) {
             Open open = new Open();
-            String cid = _connectionEndpoint.getLocalContainerId();
-            open.setContainerId(cid == null ? "" : cid);
-            open.setHostname(_connectionEndpoint.getHostname());
-            
open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
-            
open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
-            open.setProperties(_connectionEndpoint.getProperties());
+            if (_connectionEndpoint != null) {
+                String cid = _connectionEndpoint.getLocalContainerId();
+                open.setContainerId(cid == null ? "" : cid);
+                open.setHostname(_connectionEndpoint.getHostname());
+                
open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
+                
open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
+                open.setProperties(_connectionEndpoint.getProperties());
+            } else {
+                open.setContainerId("");
+            }
+
             if (_maxFrameSize > 0) {
                 open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize));
             }
@@ -747,7 +755,6 @@ public class TransportImpl extends Endpo
             _isOpenSent = true;
 
             writeFrame(0, open, null, null);
-
         }
     }
 
@@ -872,6 +879,9 @@ public class TransportImpl extends Endpo
 
     private boolean hasSendableMessages(SessionImpl session)
     {
+        if (_connectionEndpoint == null) {
+            return false;
+        }
 
         if(!_closeReceived && (session == null || 
!session.getTransportSession().endReceived()))
         {
@@ -896,14 +906,24 @@ public class TransportImpl extends Endpo
 
     private void processClose()
     {
-        if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() 
== EndpointState.CLOSED && !_isCloseSent)
-        {
+        if ((_tail_error != null ||
+             (_connectionEndpoint != null &&
+              _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) &&
+            !_isCloseSent) {
             if(!hasSendableMessages(null))
             {
                 Close close = new Close();
 
-                ErrorCondition localError = _connectionEndpoint.getCondition();
-                if( localError.getCondition() !=null )
+                ErrorCondition localError;
+
+                if (_connectionEndpoint == null) {
+                    localError = new 
ErrorCondition(ConnectionError.FRAMING_ERROR,
+                                                    _tail_error.toString());
+                } else {
+                    localError =  _connectionEndpoint.getCondition();
+                }
+
+                if(localError.getCondition() != null)
                 {
                     close.setError(localError);
                 }
@@ -911,7 +931,10 @@ public class TransportImpl extends Endpo
                 _isCloseSent = true;
 
                 writeFrame(0, close, null, null);
-                _connectionEndpoint.clearModified();
+
+                if (_connectionEndpoint != null) {
+                    _connectionEndpoint.clearModified();
+                }
             }
         }
     }
@@ -1197,11 +1220,11 @@ public class TransportImpl extends Endpo
     public void closed(TransportException error)
     {
         if (!_closeReceived || error != null) {
-            Close close = new Close();
-            String msg = error == null ? "connection aborted" : 
error.toString();
-            close.setError(new ErrorCondition(ConnectionError.FRAMING_ERROR, 
msg));
-            _isCloseSent = true;
-            writeFrame(0, close, null, null);
+            if (error == null) {
+                _tail_error = new TransportException("connection aborted");
+            } else {
+                _tail_error = error;
+            }
             _head_closed = true;
         }
     }

Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Thu Jul 17 
19:53:01 2014
@@ -98,7 +98,11 @@ class pn_condition:
       self.description = None
       self.info.clear()
     else:
-      self.name = impl.getCondition().toString()
+      cond = impl.getCondition()
+      if cond is None:
+        self.name = None
+      else:
+        self.name = cond.toString()
       self.description = impl.getDescription()
       obj2dat(impl.getInfo(), self.info)
 
@@ -909,6 +913,7 @@ def pn_transport_peek(trans, size):
   if size:
     bb = trans.impl.head()
     bb.get(ba)
+    bb.position(0)
   return 0, ba.tostring()
 
 def pn_transport_pop(trans, size):
@@ -922,16 +927,16 @@ def pn_transport_push(trans, input):
   if cap < 0:
     return cap
   elif len(input) > cap:
-    return PN_OVERFLOW
-  else:
-    bb = trans.impl.tail()
-    bb.put(array(input, 'b'))
-    try:
-      trans.impl.process()
-      return 0
-    except TransportException, e:
-      trans.error = pn_error(PN_ERR, str(e))
-      return PN_ERR
+    input = input[:cap]
+
+  bb = trans.impl.tail()
+  bb.put(array(input, 'b'))
+  try:
+    trans.impl.process()
+    return len(input)
+  except TransportException, e:
+    trans.error = pn_error(PN_ERR, str(e))
+    return PN_ERR
 
 def pn_transport_close_head(trans):
   try:

Modified: qpid/proton/trunk/tests/python/proton_tests/common.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/common.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/common.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/common.py Thu Jul 17 19:53:01 
2014
@@ -46,42 +46,36 @@ def free_tcp_ports(count=1):
     s.close()
   return ports
 
+def pump_uni(src, dst, buffer_size=1024):
+  p = src.pending()
+  c = dst.capacity()
+
+  if p < 0 and c < 0:
+    return False
+
+  if p < 0:
+    dst.close_tail()
+  elif p == 0 or c == 0:
+    return False
+  else:
+    if c < 0:
+      src.close_head()
+    else:
+      bytes = src.peek(min(c, buffer_size))
+      dst.push(bytes)
+      src.pop(len(bytes))
+
+  return True
 
 def pump(transport1, transport2, buffer_size=1024):
   """ Transfer all pending bytes between two Proton engines
-      by repeatedly calling input and output.
+      by repeatedly calling peek/pop and push.
       Asserts that each engine accepts some bytes every time
       (unless it's already closed).
   """
-
-  out1_leftover_by_t2 = ""
-  out2_leftover_by_t1 = ""
-  i = 0
-
-  while True:
-    out1 = out1_leftover_by_t2 + (transport1.output(buffer_size) or "")
-    out2 = out2_leftover_by_t1 + (transport2.output(buffer_size) or "")
-
-    if out1:
-      number_t2_consumed = transport2.input(out1)
-      if number_t2_consumed is None:
-        # special None return value means input is closed so discard the 
leftovers
-        out1_leftover_by_t2 = ""
-      else:
-        assert number_t2_consumed > 0, (number_t2_consumed, len(out1), 
out1[:100])
-        out1_leftover_by_t2 = out1[number_t2_consumed:]
-
-    if out2:
-      number_t1_consumed = transport1.input(out2)
-      if number_t1_consumed is None:
-        # special None return value means input is closed so discard the 
leftovers
-        out2_leftover_by_t1 = ""
-      else:
-        assert number_t1_consumed > 0, (number_t1_consumed, len(out1), 
out1[:100])
-        out2_leftover_by_t1 = out2[number_t1_consumed:]
-
-    if not out1 and not out2: break
-    i = i + 1
+  while (pump_uni(transport1, transport2, buffer_size) or
+         pump_uni(transport2, transport1, buffer_size)):
+    pass
 
 def isSSLPresent():
     """ True if a suitable SSL library is available.

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jul 17 19:53:01 
2014
@@ -2206,8 +2206,7 @@ class EventTest(Test):
     rcv.flow(10)
     self.pump()
     self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT,
-                Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT,
-                Event.TRANSPORT)
+                Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT)
     snd.delivery("delivery")
     snd.send("Hello World!")
     snd.advance()
@@ -2218,11 +2217,11 @@ class EventTest(Test):
     self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY)
     rcv.session.connection._transport.unbind()
     rcv.session.connection.free()
-    self.expect_oneof((Event.TRANSPORT, Event.TRANSPORT, Event.TRANSPORT, 
Event.LINK_CLOSE,
-                       Event.SESSION_CLOSE, Event.CONNECTION_CLOSE, 
Event.LINK_FINAL,
+    self.expect_oneof((Event.TRANSPORT, Event.LINK_CLOSE, Event.SESSION_CLOSE,
+                       Event.CONNECTION_CLOSE, Event.LINK_FINAL,
                        Event.SESSION_FINAL, Event.CONNECTION_FINAL),
-                      (Event.TRANSPORT, Event.TRANSPORT, Event.TRANSPORT, 
Event.LINK_CLOSE,
-                       Event.LINK_FINAL, Event.SESSION_CLOSE, 
Event.SESSION_FINAL,
+                      (Event.TRANSPORT, Event.LINK_CLOSE, Event.LINK_FINAL,
+                       Event.SESSION_CLOSE, Event.SESSION_FINAL,
                        Event.CONNECTION_CLOSE, Event.CONNECTION_FINAL))
 
   def testDeliveryEventsDisp(self):
@@ -2231,12 +2230,9 @@ class EventTest(Test):
     dlv = snd.delivery("delivery")
     snd.send("Hello World!")
     assert snd.advance()
-    self.expect(Event.LINK_OPEN,
-                Event.TRANSPORT,
-                Event.TRANSPORT,
-                Event.TRANSPORT)
+    self.expect(Event.LINK_OPEN, Event.TRANSPORT)
     self.pump()
-    self.expect()
+    self.expect(Event.LINK_FLOW)
     rdlv = rcv.current
     assert rdlv != None
     assert rdlv.tag == "delivery"

Modified: qpid/proton/trunk/tests/python/proton_tests/sasl.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/sasl.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/sasl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/sasl.py Thu Jul 17 19:53:01 2014
@@ -45,16 +45,16 @@ class SaslTest(Test):
     self.s2.server()
     self.s2.done(SASL.OK)
 
-    out1 = self.t1.output(1024)
-    out2 = self.t2.output(1024)
+    out1 = self.t1.peek(1024)
+    self.t1.pop(len(out1))
+    out2 = self.t2.peek(1024)
+    self.t2.pop(len(out2))
 
-    n = self.t2.input(out1)
-    assert n == len(out1), (n, out1)
+    self.t2.push(out1)
 
     assert self.s1.outcome is None
 
-    n = self.t1.input(out2)
-    assert n == len(out2), (n, out2)
+    self.t1.push(out2)
 
     assert self.s2.outcome == SASL.OK
 
@@ -67,8 +67,9 @@ class SaslTest(Test):
     self.s2.done(SASL.OK)
 
     # send the server's OK to the client
-    out2 = self.t2.output(1024)
-    self.t1.input(out2)
+    out2 = self.t2.peek(1024)
+    self.t2.pop(len(out2))
+    self.t1.push(out2)
 
     # do some work to generate AMQP data
     c1 = Connection()
@@ -84,15 +85,17 @@ class SaslTest(Test):
     out1_sasl_and_amqp = ""
     t1_still_producing = True
     while t1_still_producing:
-      out1 = self.t1.output(1024)
+      out1 = self.t1.peek(1024)
+      self.t1.pop(len(out1))
       out1_sasl_and_amqp += out1
       t1_still_producing = out1
 
     t2_still_consuming = True
     while t2_still_consuming:
-      num_consumed = self.t2.input(out1_sasl_and_amqp)
-      out1_sasl_and_amqp = out1_sasl_and_amqp[num_consumed:]
-      t2_still_consuming = num_consumed > 0 and len(out1_sasl_and_amqp) > 0
+      num = min(self.t2.capacity(), len(out1_sasl_and_amqp))
+      self.t2.push(out1_sasl_and_amqp[:num])
+      out1_sasl_and_amqp = out1_sasl_and_amqp[num:]
+      t2_still_consuming = num > 0 and len(out1_sasl_and_amqp) > 0
 
     assert len(out1_sasl_and_amqp) == 0, (len(out1_sasl_and_amqp), 
out1_sasl_and_amqp)
 
@@ -129,9 +132,9 @@ class SaslTest(Test):
     self.s1.mechanisms("ANONYMOUS")
     self.s1.client()
 
-    out1 = self.t1.output(1024)
-    n = self.t2.input(out1)
-    assert n == len(out1)
+    out1 = self.t1.peek(1024)
+    self.t1.pop(len(out1))
+    self.t2.push(out1)
 
     self.s2.mechanisms("ANONYMOUS")
     self.s2.server()
@@ -140,11 +143,11 @@ class SaslTest(Test):
     c2.open()
     self.t2.bind(c2)
 
-    out2 = self.t2.output(1024)
-    n = self.t1.input(out2)
-    assert n == len(out2)
+    out2 = self.t2.peek(1024)
+    self.t2.pop(len(out2))
+    self.t1.push(out2)
 
-    out1 = self.t1.output(1024)
+    out1 = self.t1.peek(1024)
     assert len(out1) > 0
 
   def testFracturedSASL(self):
@@ -156,17 +159,23 @@ class SaslTest(Test):
 
     # self.t1.trace(Transport.TRACE_FRM)
 
-    out = self.t1.output(1024)
-    self.t1.input("AMQP\x03\x01\x00\x00")
-    out = self.t1.output(1024)
-    self.t1.input("\x00\x00\x00")
-    out = self.t1.output(1024)
-    
self.t1.input("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM")
-    out = self.t1.output(1024)
-    self.t1.input("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
-    out = self.t1.output(1024)
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+    self.t1.push("AMQP\x03\x01\x00\x00")
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+    self.t1.push("\x00\x00\x00")
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+    
self.t1.push("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM")
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
+    self.t1.push("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
+    out = self.t1.peek(1024)
+    self.t1.pop(len(out))
     while out:
-      out = self.t1.output(1024)
+      out = self.t1.peek(1024)
+      self.t1.pop(len(out))
 
     assert self.s1.outcome == SASL.OK, self.s1.outcome
 

Modified: qpid/proton/trunk/tests/python/proton_tests/transport.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/transport.py?rev=1611460&r1=1611459&r2=1611460&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/transport.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/transport.py Thu Jul 17 
19:53:01 2014
@@ -27,37 +27,53 @@ class TransportTest(Test):
 
   def setup(self):
     self.transport = Transport()
+    self.peer = Transport()
+    self.conn = Connection()
+    self.peer.bind(self.conn)
 
   def teardown(self):
     self.transport = None
+    self.peer = None
+    self.conn = None
+
+  def drain(self):
+    while True:
+      p = self.transport.pending()
+      if p < 0:
+        return
+      elif p > 0:
+        bytes = self.transport.peek(p)
+        self.peer.push(bytes)
+        self.transport.pop(len(bytes))
+      else:
+        assert False
+
+  def assert_error(self, name):
+    assert self.conn.remote_container is None, self.conn.remote_container
+    self.drain()
+    # verify that we received an open frame
+    assert self.conn.remote_container is not None, self.conn.remote_container
+    # verify that we received a close frame
+    assert self.conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_CLOSED, 
self.conn.state
+    # verify that a framing error was reported
+    assert self.conn.remote_condition.name == name, self.conn.remote_condition
 
   def testEOS(self):
-    try:
-      n = self.transport.input("")
-      assert False, n
-    except TransportException:
-      pass
+    self.transport.push("") # should be a noop
+    self.transport.close_tail() # should result in framing error
+    self.assert_error(u'amqp:connection:framing-error')
 
   def testPartial(self):
-    n = self.transport.input("AMQ")
-    assert n == 3, n
-    try:
-      n = self.transport.input("")
-      assert False, n
-    except TransportException:
-      pass
+    self.transport.push("AMQ") # partial header
+    self.transport.close_tail() # should result in framing error
+    self.assert_error(u'amqp:connection:framing-error')
 
   def testGarbage(self, garbage="GARBAGE_"):
-    try:
-      n = self.transport.input(garbage)
-      assert False, n
-    except TransportException, e:
-      assert "AMQP header mismatch" in str(e), str(e)
-    try:
-      n = self.transport.input("")
-      assert False, n
-    except TransportException, e:
-      pass
+    self.transport.push(garbage)
+    self.assert_error(u'amqp:connection:framing-error')
+    assert self.transport.pending() < 0
+    self.transport.close_tail()
+    assert self.transport.pending() < 0
 
   def testSmallGarbage(self):
     self.testGarbage("XXX")
@@ -66,16 +82,12 @@ class TransportTest(Test):
     self.testGarbage("GARBAGE_XXX")
 
   def testHeader(self):
-    n = self.transport.input("AMQP\x00\x01\x00\x00")
-    assert n == 8, n
-    try:
-      n = self.transport.input("")
-      assert False, n
-    except TransportException, e:
-      assert "connection aborted" in str(e)
+    self.transport.push("AMQP\x00\x01\x00\x00")
+    self.transport.close_tail()
+    self.assert_error(u'amqp:connection:framing-error')
 
-  def testOutput(self):
-    out = self.transport.output(1024)
+  def testPeek(self):
+    out = self.transport.peek(1024)
     assert out is not None
 
   def testBindAfterOpen(self):
@@ -87,16 +99,10 @@ class TransportTest(Test):
     conn.hostname = "test-hostname"
     trn = Transport()
     trn.bind(conn)
-    out = trn.output(1024)
+    out = trn.peek(1024)
     assert "test-container" in out, repr(out)
     assert "test-hostname" in out, repr(out)
-    n = self.transport.input(out)
-    assert n > 0, n
-    out = out[n:]
-
-    if out:
-      n = self.transport.input(out)
-      assert n == 0
+    self.transport.push(out)
 
     c = Connection()
     assert c.remote_container == None
@@ -105,10 +111,6 @@ class TransportTest(Test):
     self.transport.bind(c)
     assert c.remote_container == "test-container"
     assert c.remote_hostname == "test-hostname"
-    if out:
-      assert c.session_head(0) == None
-      n = self.transport.input(out)
-      assert n == len(out), (n, out)
     assert c.session_head(0) != None
 
   def testCloseHead(self):



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to