http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java
----------------------------------------------------------------------
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java 
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java
new file mode 100644
index 0000000..6b2da05
--- /dev/null
+++ 
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.reactor.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class AddressTest {
+
+    @SuppressWarnings("deprecation")
+    private void testParse(String url, String scheme, String user, String 
pass, String host, String port, String name)
+    {
+        Address address = new Address(url);
+        assertEquals(scheme, address.getScheme());
+        assertEquals(user, address.getUser());
+        assertEquals(pass, address.getPass());
+        assertEquals(host, address.getHost());
+        assertEquals(port, address.getPort());
+        assertEquals(url, address.toString());
+    }
+
+    @Test
+    public void addressTests()
+    {
+        testParse("host", null, null, null, "host", null, null);
+        testParse("host:423", null, null, null, "host", "423", null);
+        testParse("user@host", null, "user", null, "host", null, null);
+        testParse("user:1243^&^:pw@host:423", null, "user", "1243^&^:pw", 
"host", "423", null);
+        testParse("user:1243^&^:pw@host:423/Foo.bar:90087", null, "user", 
"1243^&^:pw", "host", "423", "Foo.bar:90087");
+        testParse("user:1243^&^:pw@host:423/Foo.bar:90087@somewhere", null, 
"user", "1243^&^:pw", "host", "423", "Foo.bar:90087@somewhere");
+        testParse("[::1]", null, null, null, "::1", null, null);
+        testParse("[::1]:amqp", null, null, null, "::1", "amqp", null);
+        testParse("user@[::1]", null, "user", null, "::1", null, null);
+        testParse("user@[::1]:amqp", null, "user", null, "::1", "amqp", null);
+        testParse("user:1243^&^:pw@[::1]:amqp", null, "user", "1243^&^:pw", 
"::1", "amqp", null);
+        testParse("user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", null, "user", 
"1243^&^:pw", "::1", "amqp", "Foo.bar:90087");
+        testParse("user:1243^&^:pw@[::1:amqp/Foo.bar:90087", null, "user", 
"1243^&^:pw", "[", ":1:amqp", "Foo.bar:90087");
+        testParse("user:1243^&^:pw@::1]:amqp/Foo.bar:90087", null, "user", 
"1243^&^:pw", "", ":1]:amqp", "Foo.bar:90087");
+        testParse("amqp://user@[::1]", "amqp", "user", null, "::1", null, 
null);
+        testParse("amqp://user@[::1]:amqp", "amqp", "user", null, "::1", 
"amqp", null);
+        testParse("amqp://user@[1234:52:0:1260:f2de:f1ff:fe59:8f87]:amqp", 
"amqp", "user", null, "1234:52:0:1260:f2de:f1ff:fe59:8f87", "amqp", null);
+        testParse("amqp://user:1243^&^:pw@[::1]:amqp", "amqp", "user", 
"1243^&^:pw", "::1", "amqp", null);
+        testParse("amqp://user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", "amqp", 
"user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087");
+        testParse("amqp://host", "amqp", null, null, "host", null, null);
+        testParse("amqp://user@host", "amqp", "user", null, "host", null, 
null);
+        testParse("amqp://user@host/path:%", "amqp", "user", null, "host", 
null, "path:%");
+        testParse("amqp://user@host:5674/path:%", "amqp", "user", null, 
"host", "5674", "path:%");
+        testParse("amqp://user@host/path:%", "amqp", "user", null, "host", 
null, "path:%");
+        testParse("amqp://bigbird@host/queue@host", "amqp", "bigbird", null, 
"host", null, "queue@host");
+        testParse("amqp://host/queue@host", "amqp", null, null, "host", null, 
"queue@host");
+        testParse("amqp://host:9765/queue@host", "amqp", null, null, "host", 
"9765", "queue@host");
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/pythonTests.ignore
----------------------------------------------------------------------
diff --git a/tests/java/pythonTests.ignore b/tests/java/pythonTests.ignore
index 7911176..1fafa02 100644
--- a/tests/java/pythonTests.ignore
+++ b/tests/java/pythonTests.ignore
@@ -1,4 +1 @@
 proton_tests.reactor_interop.*
-proton_tests.soak.*
-proton_tests.ssl.SslTest.test_defaults_messenger_app
-proton_tests.ssl.SslTest.test_server_authentication_messenger_app

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/shim/binding/proton/__init__.py
----------------------------------------------------------------------
diff --git a/tests/java/shim/binding/proton/__init__.py 
b/tests/java/shim/binding/proton/__init__.py
index d3f6922..ecb480f 100644
--- a/tests/java/shim/binding/proton/__init__.py
+++ b/tests/java/shim/binding/proton/__init__.py
@@ -23,7 +23,6 @@ protocol.
 
 The proton APIs consist of the following classes:
 
- - L{Messenger} -- A messaging endpoint.
  - L{Message}   -- A class for creating and/or accessing AMQP message content.
  - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded
                   data.
@@ -148,13 +147,6 @@ class Interrupt(ProtonException):
   """
   pass
 
-class MessengerException(ProtonException):
-  """
-  The root of the messenger exception hierarchy. All exceptions
-  generated by the messenger class derive from this exception.
-  """
-  pass
-
 class MessageException(ProtonException):
   """
   The MessageException class is the root of the message exception
@@ -168,618 +160,10 @@ EXCEPTIONS = {
   PN_INTR: Interrupt
   }
 
-PENDING = Constant("PENDING")
-ACCEPTED = Constant("ACCEPTED")
-REJECTED = Constant("REJECTED")
-RELEASED = Constant("RELEASED")
-MODIFIED = Constant("MODIFIED")
-ABORTED = Constant("ABORTED")
-SETTLED = Constant("SETTLED")
-
-STATUSES = {
-  PN_STATUS_ABORTED: ABORTED,
-  PN_STATUS_ACCEPTED: ACCEPTED,
-  PN_STATUS_REJECTED: REJECTED,
-  PN_STATUS_RELEASED: RELEASED,
-  PN_STATUS_MODIFIED: MODIFIED,
-  PN_STATUS_PENDING: PENDING,
-  PN_STATUS_SETTLED: SETTLED,
-  PN_STATUS_UNKNOWN: None
-  }
 
 AUTOMATIC = Constant("AUTOMATIC")
 MANUAL = Constant("MANUAL")
 
-class Messenger(object):
-  """
-  The L{Messenger} class defines a high level interface for sending
-  and receiving L{Messages<Message>}. Every L{Messenger} contains a
-  single logical queue of incoming messages and a single logical queue
-  of outgoing messages. These messages in these queues may be destined
-  for, or originate from, a variety of addresses.
-
-  The messenger interface is single-threaded.  All methods
-  except one (L{interrupt}) are intended to be used from within
-  the messenger thread.
-
-
-  Address Syntax
-  ==============
-
-  An address has the following form::
-
-    [ amqp[s]:// ] [user[:password]@] domain [/[name]]
-
-  Where domain can be one of::
-
-    host | host:port | ip | ip:port | name
-
-  The following are valid examples of addresses:
-
-   - example.org
-   - example.org:1234
-   - amqp://example.org
-   - amqps://example.org
-   - example.org/incoming
-   - amqps://example.org/outgoing
-   - amqps://fred:[email protected]
-   - 127.0.0.1:1234
-   - amqps://127.0.0.1:1234
-
-  Sending & Receiving Messages
-  ============================
-
-  The L{Messenger} class works in conjuction with the L{Message} class. The
-  L{Message} class is a mutable holder of message content.
-
-  The L{put} method copies its L{Message} to the outgoing queue, and may
-  send queued messages if it can do so without blocking.  The L{send}
-  method blocks until it has sent the requested number of messages,
-  or until a timeout interrupts the attempt.
-
-
-    >>> message = Message()
-    >>> for i in range(3):
-    ...   message.address = "amqp://host/queue"
-    ...   message.subject = "Hello World %i" % i
-    ...   messenger.put(message)
-    >>> messenger.send()
-
-  Similarly, the L{recv} method receives messages into the incoming
-  queue, and may block as it attempts to receive the requested number
-  of messages,  or until timeout is reached. It may receive fewer
-  than the requested number.  The L{get} method pops the
-  eldest L{Message} off the incoming queue and copies it into the L{Message}
-  object that you supply.  It will not block.
-
-
-    >>> message = Message()
-    >>> messenger.recv(10):
-    >>> while messenger.incoming > 0:
-    ...   messenger.get(message)
-    ...   print message.subject
-    Hello World 0
-    Hello World 1
-    Hello World 2
-
-  The blocking flag allows you to turn off blocking behavior entirely,
-  in which case L{send} and L{recv} will do whatever they can without
-  blocking, and then return.  You can then look at the number
-  of incoming and outgoing messages to see how much outstanding work
-  still remains.
-  """
-
-  def __init__(self, name=None):
-    """
-    Construct a new L{Messenger} with the given name. The name has
-    global scope. If a NULL name is supplied, a UUID based name will
-    be chosen.
-
-    @type name: string
-    @param name: the name of the messenger or None
-
-    """
-    self._mng = pn_messenger(name)
-    self._selectables = {}
-
-  def __del__(self):
-    """
-    Destroy the L{Messenger}.  This will close all connections that
-    are managed by the L{Messenger}.  Call the L{stop} method before
-    destroying the L{Messenger}.
-    """
-    if hasattr(self, "_mng"):
-      pn_messenger_free(self._mng)
-      del self._mng
-
-  def _check(self, err):
-    if err < 0:
-      if (err == PN_INPROGRESS):
-        return
-      exc = EXCEPTIONS.get(err, MessengerException)
-      raise exc("[%s]: %s" % (err, 
pn_error_text(pn_messenger_error(self._mng))))
-    else:
-      return err
-
-  @property
-  def name(self):
-    """
-    The name of the L{Messenger}.
-    """
-    return pn_messenger_name(self._mng)
-
-  def _get_certificate(self):
-    return pn_messenger_get_certificate(self._mng)
-
-  def _set_certificate(self, value):
-    self._check(pn_messenger_set_certificate(self._mng, value))
-
-  certificate = property(_get_certificate, _set_certificate,
-                         doc="""
-Path to a certificate file for the L{Messenger}. This certificate is
-used when the L{Messenger} accepts or establishes SSL/TLS connections.
-This property must be specified for the L{Messenger} to accept
-incoming SSL/TLS connections and to establish client authenticated
-outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
-connections do not require this property.
-""")
-
-  def _get_private_key(self):
-    return pn_messenger_get_private_key(self._mng)
-
-  def _set_private_key(self, value):
-    self._check(pn_messenger_set_private_key(self._mng, value))
-
-  private_key = property(_get_private_key, _set_private_key,
-                         doc="""
-Path to a private key file for the L{Messenger's<Messenger>}
-certificate. This property must be specified for the L{Messenger} to
-accept incoming SSL/TLS connections and to establish client
-authenticated outgoing SSL/TLS connection. Non client authenticated
-SSL/TLS connections do not require this property.
-""")
-
-  def _get_password(self):
-    return pn_messenger_get_password(self._mng)
-
-  def _set_password(self, value):
-    self._check(pn_messenger_set_password(self._mng, value))
-
-  password = property(_get_password, _set_password,
-                      doc="""
-This property contains the password for the L{Messenger.private_key}
-file, or None if the file is not encrypted.
-""")
-
-  def _get_trusted_certificates(self):
-    return pn_messenger_get_trusted_certificates(self._mng)
-
-  def _set_trusted_certificates(self, value):
-    self._check(pn_messenger_set_trusted_certificates(self._mng, value))
-
-  trusted_certificates = property(_get_trusted_certificates,
-                                  _set_trusted_certificates,
-                                  doc="""
-A path to a database of trusted certificates for use in verifying the
-peer on an SSL/TLS connection. If this property is None, then the peer
-will not be verified.
-""")
-
-  def _get_timeout(self):
-    t = pn_messenger_get_timeout(self._mng)
-    if t == -1:
-      return None
-    else:
-      return millis2secs(t)
-
-  def _set_timeout(self, value):
-    if value is None:
-      t = -1
-    else:
-      t = secs2millis(value)
-    self._check(pn_messenger_set_timeout(self._mng, t))
-
-  timeout = property(_get_timeout, _set_timeout,
-                     doc="""
-The timeout property contains the default timeout for blocking
-operations performed by the L{Messenger}.
-""")
-
-  def _is_blocking(self):
-    return pn_messenger_is_blocking(self._mng)
-
-  def _set_blocking(self, b):
-    self._check(pn_messenger_set_blocking(self._mng, b))
-
-  blocking = property(_is_blocking, _set_blocking,
-                      doc="""
-Enable or disable blocking behavior during L{Message} sending
-and receiving.  This affects every blocking call, with the
-exception of L{work}.  Currently, the affected calls are
-L{send}, L{recv}, and L{stop}.
-""")
-
-  def _is_passive(self):
-    return pn_messenger_is_passive(self._mng)
-
-  def _set_passive(self, b):
-    self._check(pn_messenger_set_passive(self._mng, b))
-
-  passive = property(_is_passive, _set_passive,
-                      doc="""
-When passive is set to true, Messenger will not attempt to perform I/O
-internally. In this mode it is necessary to use the selectables API to
-drive any I/O needed to perform requested actions. In this mode
-Messenger will never block.
-""")
-
-  def _get_incoming_window(self):
-    return pn_messenger_get_incoming_window(self._mng)
-
-  def _set_incoming_window(self, window):
-    self._check(pn_messenger_set_incoming_window(self._mng, window))
-
-  incoming_window = property(_get_incoming_window, _set_incoming_window,
-                             doc="""
-The incoming tracking window for the messenger. The messenger will
-track the remote status of this many incoming deliveries after they
-have been accepted or rejected. Defaults to zero.
-
-L{Messages<Message>} enter this window only when you take them into your 
application
-using L{get}.  If your incoming window size is I{n}, and you get I{n}+1 
L{messages<Message>}
-without explicitly accepting or rejecting the oldest message, then the
-message that passes beyond the edge of the incoming window will be assigned
-the default disposition of its link.
-""")
-
-  def _get_outgoing_window(self):
-    return pn_messenger_get_outgoing_window(self._mng)
-
-  def _set_outgoing_window(self, window):
-    self._check(pn_messenger_set_outgoing_window(self._mng, window))
-
-  outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
-                             doc="""
-The outgoing tracking window for the messenger. The messenger will
-track the remote status of this many outgoing deliveries after calling
-send. Defaults to zero.
-
-A L{Message} enters this window when you call the put() method with the
-message.  If your outgoing window size is I{n}, and you call L{put} I{n}+1
-times, status information will no longer be available for the
-first message.
-""")
-
-  def start(self):
-    """
-    Currently a no-op placeholder.
-    For future compatibility, do not L{send} or L{recv} messages
-    before starting the L{Messenger}.
-    """
-    self._check(pn_messenger_start(self._mng))
-
-  def stop(self):
-    """
-    Transitions the L{Messenger} to an inactive state. An inactive
-    L{Messenger} will not send or receive messages from its internal
-    queues. A L{Messenger} should be stopped before being discarded to
-    ensure a clean shutdown handshake occurs on any internally managed
-    connections.
-    """
-    self._check(pn_messenger_stop(self._mng))
-
-  @property
-  def stopped(self):
-    """
-    Returns true iff a L{Messenger} is in the stopped state.
-    This function does not block.
-    """
-    return pn_messenger_stopped(self._mng)
-
-  def subscribe(self, source):
-    """
-    Subscribes the L{Messenger} to messages originating from the
-    specified source. The source is an address as specified in the
-    L{Messenger} introduction with the following addition. If the
-    domain portion of the address begins with the '~' character, the
-    L{Messenger} will interpret the domain as host/port, bind to it,
-    and listen for incoming messages. For example "~0.0.0.0",
-    "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
-    local interface and listen for incoming messages with the last
-    variant only permitting incoming SSL connections.
-
-    @type source: string
-    @param source: the source of messages to subscribe to
-    """
-    sub_impl = pn_messenger_subscribe(self._mng, source)
-    if not sub_impl:
-      self._check(pn_error_code(pn_messenger_error(self._mng)))
-      raise MessengerException("Cannot subscribe to %s"%source)
-    return Subscription(sub_impl)
-
-  def put(self, message):
-    """
-    Places the content contained in the message onto the outgoing
-    queue of the L{Messenger}. This method will never block, however
-    it will send any unblocked L{Messages<Message>} in the outgoing
-    queue immediately and leave any blocked L{Messages<Message>}
-    remaining in the outgoing queue. The L{send} call may be used to
-    block until the outgoing queue is empty. The L{outgoing} property
-    may be used to check the depth of the outgoing queue.
-
-    When the content in a given L{Message} object is copied to the outgoing
-    message queue, you may then modify or discard the L{Message} object
-    without having any impact on the content in the outgoing queue.
-
-    This method returns an outgoing tracker for the L{Message}.  The tracker
-    can be used to determine the delivery status of the L{Message}.
-
-    @type message: Message
-    @param message: the message to place in the outgoing queue
-    @return: a tracker
-    """
-    message._pre_encode()
-    self._check(pn_messenger_put(self._mng, message._msg))
-    return pn_messenger_outgoing_tracker(self._mng)
-
-  def status(self, tracker):
-    """
-    Gets the last known remote state of the delivery associated with
-    the given tracker.
-
-    @type tracker: tracker
-    @param tracker: the tracker whose status is to be retrieved
-
-    @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED
-    """
-    disp = pn_messenger_status(self._mng, tracker);
-    return STATUSES.get(disp, disp)
-
-  def buffered(self, tracker):
-    """
-    Checks if the delivery associated with the given tracker is still
-    waiting to be sent.
-
-    @type tracker: tracker
-    @param tracker: the tracker whose status is to be retrieved
-
-    @return: true if delivery is still buffered
-    """
-    return pn_messenger_buffered(self._mng, tracker);
-
-  def settle(self, tracker=None):
-    """
-    Frees a L{Messenger} from tracking the status associated with a given
-    tracker. If you don't supply a tracker, all outgoing L{messages<Message>} 
up
-    to the most recent will be settled.
-    """
-    if tracker is None:
-      tracker = pn_messenger_outgoing_tracker(self._mng)
-      flags = PN_CUMULATIVE
-    else:
-      flags = 0
-    self._check(pn_messenger_settle(self._mng, tracker, flags))
-
-  def send(self, n=-1):
-    """
-    This call will block until the indicated number of L{messages<Message>}
-    have been sent, or until the operation times out.  If n is -1 this call 
will
-    block until all outgoing L{messages<Message>} have been sent. If n is 0 
then
-    this call will send whatever it can without blocking.
-    """
-    self._check(pn_messenger_send(self._mng, n))
-
-  def recv(self, n=None):
-    """
-    Receives up to I{n} L{messages<Message>} into the incoming queue.  If no 
value
-    for I{n} is supplied, this call will receive as many L{messages<Message>} 
as it
-    can buffer internally.  If the L{Messenger} is in blocking mode, this
-    call will block until at least one L{Message} is available in the
-    incoming queue.
-    """
-    if n is None:
-      n = -1
-    self._check(pn_messenger_recv(self._mng, n))
-
-  def work(self, timeout=None):
-    """
-    Sends or receives any outstanding L{messages<Message>} queued for a 
L{Messenger}.
-    This will block for the indicated timeout.
-    This method may also do I/O work other than sending and receiving
-    L{messages<Message>}.  For example, closing connections after 
messenger.L{stop}()
-    has been called.
-    """
-    if timeout is None:
-      t = -1
-    else:
-      t = secs2millis(timeout)
-    err = pn_messenger_work(self._mng, t)
-    if (err == PN_TIMEOUT):
-      return False
-    else:
-      self._check(err)
-      return True
-
-  @property
-  def receiving(self):
-    return pn_messenger_receiving(self._mng)
-
-  def interrupt(self):
-    """
-    The L{Messenger} interface is single-threaded.
-    This is the only L{Messenger} function intended to be called
-    from outside of the L{Messenger} thread.
-    Call this from a non-messenger thread to interrupt
-    a L{Messenger} that is blocking.
-    This will cause any in-progress blocking call to throw
-    the L{Interrupt} exception.  If there is no currently blocking
-    call, then the next blocking call will be affected, even if it
-    is within the same thread that interrupt was called from.
-    """
-    self._check(pn_messenger_interrupt(self._mng))
-
-  def get(self, message=None):
-    """
-    Moves the message from the head of the incoming message queue into
-    the supplied message object. Any content in the message will be
-    overwritten.
-
-    A tracker for the incoming L{Message} is returned.  The tracker can
-    later be used to communicate your acceptance or rejection of the
-    L{Message}.
-
-    If None is passed in for the L{Message} object, the L{Message}
-    popped from the head of the queue is discarded.
-
-    @type message: Message
-    @param message: the destination message object
-    @return: a tracker
-    """
-    if message is None:
-      impl = None
-    else:
-      impl = message._msg
-    self._check(pn_messenger_get(self._mng, impl))
-    if message is not None:
-      message._post_decode()
-    return pn_messenger_incoming_tracker(self._mng)
-
-  def accept(self, tracker=None):
-    """
-    Signal the sender that you have acted on the L{Message}
-    pointed to by the tracker.  If no tracker is supplied,
-    then all messages that have been returned by the L{get}
-    method are accepted, except those that have already been
-    auto-settled by passing beyond your incoming window size.
-
-    @type tracker: tracker
-    @param tracker: a tracker as returned by get
-    """
-    if tracker is None:
-      tracker = pn_messenger_incoming_tracker(self._mng)
-      flags = PN_CUMULATIVE
-    else:
-      flags = 0
-    self._check(pn_messenger_accept(self._mng, tracker, flags))
-
-  def reject(self, tracker=None):
-    """
-    Rejects the L{Message} indicated by the tracker.  If no tracker
-    is supplied, all messages that have been returned by the L{get}
-    method are rejected, except those that have already been auto-settled
-    by passing beyond your outgoing window size.
-
-    @type tracker: tracker
-    @param tracker: a tracker as returned by get
-    """
-    if tracker is None:
-      tracker = pn_messenger_incoming_tracker(self._mng)
-      flags = PN_CUMULATIVE
-    else:
-      flags = 0
-    self._check(pn_messenger_reject(self._mng, tracker, flags))
-
-  @property
-  def outgoing(self):
-    """
-    The outgoing queue depth.
-    """
-    return pn_messenger_outgoing(self._mng)
-
-  @property
-  def incoming(self):
-    """
-    The incoming queue depth.
-    """
-    return pn_messenger_incoming(self._mng)
-
-  def route(self, pattern, address):
-    """
-          Adds a routing rule to a L{Messenger's<Messenger>} internal routing 
table.
-
-          The route procedure may be used to influence how a L{Messenger} will
-          internally treat a given address or class of addresses. Every call
-          to the route procedure will result in L{Messenger} appending a 
routing
-          rule to its internal routing table.
-
-          Whenever a L{Message} is presented to a L{Messenger} for delivery, it
-          will match the address of this message against the set of routing
-          rules in order. The first rule to match will be triggered, and
-          instead of routing based on the address presented in the message,
-          the L{Messenger} will route based on the address supplied in the 
rule.
-
-          The pattern matching syntax supports two types of matches, a '%'
-          will match any character except a '/', and a '*' will match any
-          character including a '/'.
-
-          A routing address is specified as a normal AMQP address, however it
-          may additionally use substitution variables from the pattern match
-          that triggered the rule.
-
-          Any message sent to "foo" will be routed to "amqp://foo.com":
-
-             >>> messenger.route("foo", "amqp://foo.com");
-
-          Any message sent to "foobar" will be routed to
-          "amqp://foo.com/bar":
-
-             >>> messenger.route("foobar", "amqp://foo.com/bar");
-
-          Any message sent to bar/<path> will be routed to the corresponding
-          path within the amqp://bar.com domain:
-
-             >>> messenger.route("bar/*", "amqp://bar.com/$1");
-
-          Route all L{messages<Message>} over TLS:
-
-             >>> messenger.route("amqp:*", "amqps:$1")
-
-          Supply credentials for foo.com:
-
-             >>> messenger.route("amqp://foo.com/*", 
"amqp://user:[email protected]/$1");
-
-          Supply credentials for all domains:
-
-             >>> messenger.route("amqp://*", "amqp://user:password@$1");
-
-          Route all addresses through a single proxy while preserving the
-          original destination:
-
-             >>> messenger.route("amqp://%/*", 
"amqp://user:password@proxy/$1/$2");
-
-          Route any address through a single broker:
-
-             >>> messenger.route("*", "amqp://user:password@broker/$1");
-    """
-    self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), 
unicode2utf8(address)))
-
-  def rewrite(self, pattern, address):
-    """
-    Similar to route(), except that the destination of
-    the L{Message} is determined before the message address is rewritten.
-
-    The outgoing address is only rewritten after routing has been
-    finalized.  If a message has an outgoing address of
-    "amqp://0.0.0.0:5678", and a rewriting rule that changes its
-    outgoing address to "foo", it will still arrive at the peer that
-    is listening on "amqp://0.0.0.0:5678", but when it arrives there,
-    the receiver will see its outgoing address as "foo".
-
-    The default rewrite rule removes username and password from addresses
-    before they are transmitted.
-    """
-    self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), 
unicode2utf8(address)))
-
-  def selectable(self):
-    return Selectable.wrap(pn_messenger_selectable(self._mng))
-
-  @property
-  def deadline(self):
-    tstamp = pn_messenger_deadline(self._mng)
-    if tstamp:
-      return millis2secs(tstamp)
-    else:
-      return None
 
 class Message(object):
   """The L{Message} class is a mutable holder of message content.
@@ -4259,8 +3643,6 @@ __all__ = [
            "Link",
            "Message",
            "MessageException",
-           "Messenger",
-           "MessengerException",
            "ProtonException",
            "VERSION_MAJOR",
            "VERSION_MINOR",

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/shim/cmessenger.py
----------------------------------------------------------------------
diff --git a/tests/java/shim/cmessenger.py b/tests/java/shim/cmessenger.py
deleted file mode 100644
index 249e0dc..0000000
--- a/tests/java/shim/cmessenger.py
+++ /dev/null
@@ -1,225 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from org.apache.qpid.proton import Proton
-from org.apache.qpid.proton.messenger import Messenger, Status
-from org.apache.qpid.proton import InterruptException, TimeoutException
-
-from cerror import *
-
-# from proton/messenger.h
-PN_STATUS_UNKNOWN = 0
-PN_STATUS_PENDING = 1
-PN_STATUS_ACCEPTED = 2
-PN_STATUS_REJECTED = 3
-PN_STATUS_RELEASED = 4
-PN_STATUS_MODIFIED = 5
-PN_STATUS_ABORTED = 6
-PN_STATUS_SETTLED = 7
-
-PN_CUMULATIVE = 1
-
-class pn_messenger_wrapper:
-
-  def __init__(self, impl):
-    self.impl = impl
-    self.error = pn_error(0, None)
-
-def pn_messenger(name):
-  if name is None:
-    return pn_messenger_wrapper(Proton.messenger())
-  else:
-    return pn_messenger_wrapper(Proton.messenger(name))
-
-def pn_messenger_error(m):
-  return m.error
-
-def pn_messenger_set_timeout(m, t):
-  m.impl.setTimeout(t)
-  return 0
-
-def pn_messenger_set_blocking(m, b):
-  m.impl.setBlocking(b)
-  return 0
-
-def pn_messenger_set_certificate(m, c):
-  m.impl.setCertificate(c)
-  return 0
-
-def pn_messenger_set_private_key(m, p):
-  m.impl.setPrivateKey(p)
-  return 0
-
-def pn_messenger_set_password(m, p):
-  m.impl.setPassword(p)
-  return 0
-
-def pn_messenger_set_trusted_certificates(m, t):
-  m.impl.setTrustedCertificates(t)
-  return 0
-
-def pn_messenger_set_incoming_window(m, w):
-  m.impl.setIncomingWindow(w)
-  return 0
-
-def pn_messenger_set_outgoing_window(m, w):
-  m.impl.setOutgoingWindow(w)
-  return 0
-
-def pn_messenger_start(m):
-  m.impl.start()
-  return 0
-
-# XXX: ???
-def pn_messenger_work(m, t):
-  try:
-    if m.impl.work(t):
-      return 1
-    else:
-      return PN_TIMEOUT
-  except InterruptException, e:
-    return PN_INTR
-
-class pn_subscription:
-
-  def __init__(self):
-    pass
-
-def pn_messenger_subscribe(m, source):
-  m.impl.subscribe(source)
-  return pn_subscription()
-
-def pn_messenger_route(m, pattern, address):
-  m.impl.route(pattern, address)
-  return 0
-
-def pn_messenger_rewrite(m, pattern, address):
-  m.impl.rewrite(pattern, address)
-  return 0
-
-def pn_messenger_interrupt(m):
-  m.impl.interrupt()
-  return 0
-
-def pn_messenger_buffered(m, t):
-  raise Skipped()
-
-from org.apache.qpid.proton.engine import TransportException
-
-def pn_messenger_stop(m):
-  m.impl.stop()
-  return 0
-
-def pn_messenger_stopped(m):
-  return m.impl.stopped()
-
-def pn_messenger_put(m, msg):
-  msg.pre_encode()
-  m.impl.put(msg.impl)
-  return 0
-
-def pn_messenger_outgoing_tracker(m):
-  return m.impl.outgoingTracker()
-
-def pn_messenger_send(m, n):
-  try:
-    m.impl.send(n)
-    return 0
-  except InterruptException, e:
-    return PN_INTR
-  except TimeoutException, e:
-    return PN_TIMEOUT
-
-def pn_messenger_recv(m, n):
-  try:
-    m.impl.recv(n)
-    return 0
-  except InterruptException, e:
-    return PN_INTR
-  except TimeoutException, e:
-    return PN_TIMEOUT
-
-def pn_messenger_receiving(m):
-  return m.impl.receiving()
-
-def pn_messenger_incoming(m):
-  return m.impl.incoming()
-
-def pn_messenger_outgoing(m):
-  return m.impl.outgoing()
-
-def pn_messenger_get(m, msg):
-  mimpl = m.impl.get()
-  if msg:
-    msg.decode(mimpl)
-  return 0
-
-def pn_messenger_incoming_tracker(m):
-  return m.impl.incomingTracker()
-
-def pn_messenger_accept(m, tracker, flags):
-  if flags:
-    m.impl.accept(tracker, Messenger.CUMULATIVE)
-  else:
-    m.impl.accept(tracker, 0)
-  return 0
-
-def pn_messenger_reject(m, tracker, flags):
-  if flags:
-    m.impl.reject(tracker, Messenger.CUMULATIVE)
-  else:
-    m.impl.reject(tracker, 0)
-  return 0
-
-def pn_messenger_settle(m, tracker, flags):
-  if flags:
-    m.impl.settle(tracker, Messenger.CUMULATIVE)
-  else:
-    m.impl.settle(tracker, 0)
-  return 0
-
-STATUS_P2J = {
-  PN_STATUS_UNKNOWN: Status.UNKNOWN,
-  PN_STATUS_PENDING: Status.PENDING,
-  PN_STATUS_ACCEPTED: Status.ACCEPTED,
-  PN_STATUS_REJECTED: Status.REJECTED,
-  PN_STATUS_RELEASED: Status.RELEASED,
-  PN_STATUS_MODIFIED: Status.MODIFIED,
-  PN_STATUS_ABORTED: Status.ABORTED,
-  PN_STATUS_SETTLED: Status.SETTLED
-}
-
-STATUS_J2P = {
-  Status.UNKNOWN: PN_STATUS_UNKNOWN,
-  Status.PENDING: PN_STATUS_PENDING,
-  Status.ACCEPTED: PN_STATUS_ACCEPTED,
-  Status.REJECTED: PN_STATUS_REJECTED,
-  Status.RELEASED: PN_STATUS_RELEASED,
-  Status.MODIFIED: PN_STATUS_MODIFIED,
-  Status.ABORTED: PN_STATUS_ABORTED,
-  Status.SETTLED: PN_STATUS_SETTLED
-}
-
-def pn_messenger_status(m, tracker):
-  return STATUS_J2P[m.impl.getStatus(tracker)]
-
-def pn_messenger_set_passive(m, passive):
-  raise Skipped()
-
-def pn_messenger_selectable(m):
-  raise Skipped()

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/shim/cproton.py
----------------------------------------------------------------------
diff --git a/tests/java/shim/cproton.py b/tests/java/shim/cproton.py
index d5ed574..ec7feec 100644
--- a/tests/java/shim/cproton.py
+++ b/tests/java/shim/cproton.py
@@ -35,7 +35,6 @@ from ccodec import *
 from cengine import *
 from csasl import *
 from cssl import *
-from cmessenger import *
 from cmessage import *
 from curl import *
 from creactor import *

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/shim/curl.py
----------------------------------------------------------------------
diff --git a/tests/java/shim/curl.py b/tests/java/shim/curl.py
index d4d3d37..2b8c9c8 100644
--- a/tests/java/shim/curl.py
+++ b/tests/java/shim/curl.py
@@ -17,7 +17,7 @@
 # under the License
 #
 
-from org.apache.qpid.proton.messenger.impl import Address
+from org.apache.qpid.proton.reactor.impl import Address
 
 def pn_url():
     return Address()

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/python/proton_tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/__init__.py 
b/tests/python/proton_tests/__init__.py
index 66ce650..647cbf5 100644
--- a/tests/python/proton_tests/__init__.py
+++ b/tests/python/proton_tests/__init__.py
@@ -22,11 +22,9 @@ import proton_tests.engine
 import proton_tests.message
 import proton_tests.handler
 import proton_tests.reactor
-import proton_tests.messenger
 import proton_tests.sasl
 import proton_tests.transport
 import proton_tests.ssl
 import proton_tests.interop
-import proton_tests.soak
 import proton_tests.url
 import proton_tests.utils

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/python/proton_tests/common.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/common.py 
b/tests/python/proton_tests/common.py
index aaefccd..02de4ff 100644
--- a/tests/python/proton_tests/common.py
+++ b/tests/python/proton_tests/common.py
@@ -264,308 +264,3 @@ class TestServer(object):
   def on_delivery(self, event):
     event.delivery.settle()
 
-#
-# Classes that wrap the messenger applications msgr-send and msgr-recv.
-# These applications reside in the tests/tools/apps directory
-#
-
-class MessengerApp(object):
-    """ Interface to control a MessengerApp """
-    def __init__(self):
-        self._cmdline = None
-        # options common to Receivers and Senders:
-        self.ca_db = None
-        self.certificate = None
-        self.privatekey = None
-        self.password = None
-        self._output = None
-
-    def start(self, verbose=False):
-        """ Begin executing the test """
-        cmd = self.cmdline()
-        self._verbose = verbose
-        if self._verbose:
-            print("COMMAND='%s'" % str(cmd))
-        #print("ENV='%s'" % str(os.environ.copy()))
-        try:
-            # Handle python launch by replacing script 'filename' with
-            # 'python abspath-to-filename' in cmdline arg list.
-            if cmd[0].endswith('.py'):
-                foundfile = findfileinpath(cmd[0], os.getenv('PATH'))
-                if foundfile is None:
-                    msg = "Unable to locate file '%s' in PATH" % cmd[0]
-                    raise Skipped("Skipping test - %s" % msg)
-
-                del cmd[0:1]
-                cmd.insert(0, foundfile)
-                cmd.insert(0, sys.executable)
-            self._process = Popen(cmd, stdout=PIPE, stderr=STDOUT,
-                                  bufsize=4096, universal_newlines=True)
-        except OSError:
-            e = sys.exc_info()[1]
-            print("ERROR: '%s'" % e)
-            msg = "Unable to execute command '%s', is it in your PATH?" % 
cmd[0]
-
-            # NOTE(flaper87): Skip the test if the command is not found.
-            if e.errno == 2:
-              raise Skipped("Skipping test - %s" % msg)
-            assert False, msg
-
-        self._ready()  # wait for it to initialize
-
-    def stop(self):
-        """ Signal the client to start clean shutdown """
-        pass
-
-    def wait(self):
-        """ Wait for client to complete """
-        self._output = self._process.communicate()
-        if self._verbose:
-            print("OUTPUT='%s'" % self.stdout())
-
-    def status(self):
-        """ Return status from client process """
-        return self._process.returncode
-
-    def stdout(self):
-        #self._process.communicate()[0]
-        if not self._output or not self._output[0]:
-            return "*** NO STDOUT ***"
-        return self._output[0]
-
-    def stderr(self):
-        if not self._output or not self._output[1]:
-            return "*** NO STDERR ***"
-        return self._output[1]
-
-    def cmdline(self):
-        if not self._cmdline:
-            self._build_command()
-        return self._cmdline
-
-    def _build_command(self):
-        assert False, "_build_command() needs override"
-
-    def _ready(self):
-        assert False, "_ready() needs override"
-
-    def _do_common_options(self):
-        """ Common option handling """
-        if self.ca_db is not None:
-            self._cmdline.append("-T")
-            self._cmdline.append(str(self.ca_db))
-        if self.certificate is not None:
-            self._cmdline.append("-C")
-            self._cmdline.append(str(self.certificate))
-        if self.privatekey is not None:
-            self._cmdline.append("-K")
-            self._cmdline.append(str(self.privatekey))
-        if self.password is not None:
-            self._cmdline.append("-P")
-            self._cmdline.append("pass:" + str(self.password))
-
-
-class MessengerSender(MessengerApp):
-    """ Interface to configure a sending MessengerApp """
-    def __init__(self):
-        MessengerApp.__init__(self)
-        self._command = None
-        # @todo make these properties
-        self.targets = []
-        self.send_count = None
-        self.msg_size = None
-        self.send_batch = None
-        self.outgoing_window = None
-        self.report_interval = None
-        self.get_reply = False
-        self.timeout = None
-        self.incoming_window = None
-        self.recv_count = None
-        self.name = None
-
-    # command string?
-    def _build_command(self):
-        self._cmdline = self._command
-        self._do_common_options()
-        assert self.targets, "Missing targets, required for sender!"
-        self._cmdline.append("-a")
-        self._cmdline.append(",".join(self.targets))
-        if self.send_count is not None:
-            self._cmdline.append("-c")
-            self._cmdline.append(str(self.send_count))
-        if self.msg_size is not None:
-            self._cmdline.append("-b")
-            self._cmdline.append(str(self.msg_size))
-        if self.send_batch is not None:
-            self._cmdline.append("-p")
-            self._cmdline.append(str(self.send_batch))
-        if self.outgoing_window is not None:
-            self._cmdline.append("-w")
-            self._cmdline.append(str(self.outgoing_window))
-        if self.report_interval is not None:
-            self._cmdline.append("-e")
-            self._cmdline.append(str(self.report_interval))
-        if self.get_reply:
-            self._cmdline.append("-R")
-        if self.timeout is not None:
-            self._cmdline.append("-t")
-            self._cmdline.append(str(self.timeout))
-        if self.incoming_window is not None:
-            self._cmdline.append("-W")
-            self._cmdline.append(str(self.incoming_window))
-        if self.recv_count is not None:
-            self._cmdline.append("-B")
-            self._cmdline.append(str(self.recv_count))
-        if self.name is not None:
-            self._cmdline.append("-N")
-            self._cmdline.append(str(self.name))
-
-    def _ready(self):
-        pass
-
-
-class MessengerReceiver(MessengerApp):
-    """ Interface to configure a receiving MessengerApp """
-    def __init__(self):
-        MessengerApp.__init__(self)
-        self._command = None
-        # @todo make these properties
-        self.subscriptions = []
-        self.receive_count = None
-        self.recv_count = None
-        self.incoming_window = None
-        self.timeout = None
-        self.report_interval = None
-        self.send_reply = False
-        self.outgoing_window = None
-        self.forwards = []
-        self.name = None
-
-    # command string?
-    def _build_command(self):
-        self._cmdline = self._command
-        self._do_common_options()
-        self._cmdline += ["-X", "READY"]
-        assert self.subscriptions, "Missing subscriptions, required for 
receiver!"
-        self._cmdline.append("-a")
-        self._cmdline.append(",".join(self.subscriptions))
-        if self.receive_count is not None:
-            self._cmdline.append("-c")
-            self._cmdline.append(str(self.receive_count))
-        if self.recv_count is not None:
-            self._cmdline.append("-b")
-            self._cmdline.append(str(self.recv_count))
-        if self.incoming_window is not None:
-            self._cmdline.append("-w")
-            self._cmdline.append(str(self.incoming_window))
-        if self.timeout is not None:
-            self._cmdline.append("-t")
-            self._cmdline.append(str(self.timeout))
-        if self.report_interval is not None:
-            self._cmdline.append("-e")
-            self._cmdline.append(str(self.report_interval))
-        if self.send_reply:
-            self._cmdline.append("-R")
-        if self.outgoing_window is not None:
-            self._cmdline.append("-W")
-            self._cmdline.append(str(self.outgoing_window))
-        if self.forwards:
-            self._cmdline.append("-F")
-            self._cmdline.append(",".join(self.forwards))
-        if self.name is not None:
-            self._cmdline.append("-N")
-            self._cmdline.append(str(self.name))
-
-    def _ready(self):
-        """ wait for subscriptions to complete setup. """
-        r = self._process.stdout.readline()
-        assert r.strip() == "READY", "Unexpected input while waiting for 
receiver to initialize: %s" % r
-
-class MessengerSenderC(MessengerSender):
-    def __init__(self):
-        MessengerSender.__init__(self)
-        self._command = ["msgr-send"]
-
-class MessengerSenderValgrind(MessengerSenderC):
-    """ Run the C sender under Valgrind
-    """
-    def __init__(self, suppressions=None):
-        if "VALGRIND" not in os.environ:
-            raise Skipped("Skipping test - $VALGRIND not set.")
-        MessengerSenderC.__init__(self)
-        if not suppressions:
-            suppressions = os.path.join(os.path.dirname(__file__),
-                                        "valgrind.supp" )
-        self._command = [os.environ["VALGRIND"], "--error-exitcode=42", 
"--quiet",
-                         "--trace-children=yes", "--leak-check=full",
-                         "--suppressions=%s" % suppressions] + self._command
-
-class MessengerReceiverC(MessengerReceiver):
-    def __init__(self):
-        MessengerReceiver.__init__(self)
-        self._command = ["msgr-recv"]
-
-class MessengerReceiverValgrind(MessengerReceiverC):
-    """ Run the C receiver under Valgrind
-    """
-    def __init__(self, suppressions=None):
-        if "VALGRIND" not in os.environ:
-            raise Skipped("Skipping test - $VALGRIND not set.")
-        MessengerReceiverC.__init__(self)
-        if not suppressions:
-            suppressions = os.path.join(os.path.dirname(__file__),
-                                        "valgrind.supp" )
-        self._command = [os.environ["VALGRIND"], "--error-exitcode=42", 
"--quiet",
-                         "--trace-children=yes", "--leak-check=full",
-                         "--suppressions=%s" % suppressions] + self._command
-
-class MessengerSenderPython(MessengerSender):
-    def __init__(self):
-        MessengerSender.__init__(self)
-        self._command = ["msgr-send.py"]
-
-
-class MessengerReceiverPython(MessengerReceiver):
-    def __init__(self):
-        MessengerReceiver.__init__(self)
-        self._command = ["msgr-recv.py"]
-
-
-
-class ReactorSenderC(MessengerSender):
-    def __init__(self):
-        MessengerSender.__init__(self)
-        self._command = ["reactor-send"]
-
-class ReactorSenderValgrind(ReactorSenderC):
-    """ Run the C sender under Valgrind
-    """
-    def __init__(self, suppressions=None):
-        if "VALGRIND" not in os.environ:
-            raise Skipped("Skipping test - $VALGRIND not set.")
-        ReactorSenderC.__init__(self)
-        if not suppressions:
-            suppressions = os.path.join(os.path.dirname(__file__),
-                                        "valgrind.supp" )
-        self._command = [os.environ["VALGRIND"], "--error-exitcode=42", 
"--quiet",
-                         "--trace-children=yes", "--leak-check=full",
-                         "--suppressions=%s" % suppressions] + self._command
-
-class ReactorReceiverC(MessengerReceiver):
-    def __init__(self):
-        MessengerReceiver.__init__(self)
-        self._command = ["reactor-recv"]
-
-class ReactorReceiverValgrind(ReactorReceiverC):
-    """ Run the C receiver under Valgrind
-    """
-    def __init__(self, suppressions=None):
-        if "VALGRIND" not in os.environ:
-            raise Skipped("Skipping test - $VALGRIND not set.")
-        ReactorReceiverC.__init__(self)
-        if not suppressions:
-            suppressions = os.path.join(os.path.dirname(__file__),
-                                        "valgrind.supp" )
-        self._command = [os.environ["VALGRIND"], "--error-exitcode=42", 
"--quiet",
-                         "--trace-children=yes", "--leak-check=full",
-                         "--suppressions=%s" % suppressions] + self._command

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/python/proton_tests/messenger.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/messenger.py 
b/tests/python/proton_tests/messenger.py
deleted file mode 100644
index 8da068e..0000000
--- a/tests/python/proton_tests/messenger.py
+++ /dev/null
@@ -1,1091 +0,0 @@
-from __future__ import absolute_import
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, sys, traceback
-from . import common
-from proton import *
-from threading import Thread, Event
-from time import sleep, time
-from .common import Skipped
-
-class Test(common.Test):
-
-  def setUp(self):
-    self.server_credit = 10
-    self.server_received = 0
-    self.server_finite_credit = False
-    self.server = Messenger("server")
-    self.server.timeout = self.timeout
-    self.server.start()
-    self.port = common.free_tcp_port()
-    self.server.subscribe("amqp://~127.0.0.1:%d" % self.port)
-    self.server_thread = Thread(name="server-thread", target=self.run_server)
-    self.server_thread.daemon = True
-    self.server_is_running_event = Event()
-    self.running = True
-    self.server_thread_started = False
-
-    self.client = Messenger("client")
-    self.client.timeout = self.timeout
-
-  def start(self):
-    self.server_thread_started = True
-    self.server_thread.start()
-    self.server_is_running_event.wait(self.timeout)
-    self.client.start()
-
-  def _safelyStopClient(self):
-    self.server.interrupt()
-    self.client.stop()
-    self.client = None
-
-  def tearDown(self):
-    try:
-      if self.running:
-        if not self.server_thread_started: self.start()
-        # send a message to cause the server to promptly exit
-        self.running = False
-        self._safelyStopClient()
-    finally:
-      self.server_thread.join(self.timeout)
-      self.server = None
-
-REJECT_ME = "*REJECT-ME*"
-
-class MessengerTest(Test):
-
-  def run_server(self):
-    if self.server_finite_credit:
-      self._run_server_finite_credit()
-    else:
-      self._run_server_recv()
-
-  def _run_server_recv(self):
-    """ Use recv() to replenish credit each time the server waits
-    """
-    msg = Message()
-    try:
-      while self.running:
-        self.server_is_running_event.set()
-        try:
-          self.server.recv(self.server_credit)
-          self.process_incoming(msg)
-        except Interrupt:
-          pass
-    finally:
-      self.server.stop()
-      self.running = False
-
-  def _run_server_finite_credit(self):
-    """ Grant credit once, process until credit runs out
-    """
-    msg = Message()
-    self.server_is_running_event.set()
-    try:
-      self.server.recv(self.server_credit)
-      while self.running:
-        try:
-          # do not grant additional credit (eg. call recv())
-          self.process_incoming(msg)
-          self.server.work()
-        except Interrupt:
-          break
-    finally:
-      self.server.stop()
-      self.running = False
-
-  def process_incoming(self, msg):
-    while self.server.incoming:
-      self.server.get(msg)
-      self.server_received += 1
-      if msg.body == REJECT_ME:
-        self.server.reject()
-      else:
-        self.server.accept()
-      self.dispatch(msg)
-
-  def dispatch(self, msg):
-    if msg.reply_to:
-      msg.address = msg.reply_to
-      self.server.put(msg)
-      self.server.settle()
-
-  def testSendReceive(self, size=None, address_size=None):
-    self.start()
-    msg = Message()
-    if address_size:
-      msg.address="amqp://127.0.0.1:%d/%s" % (self.port, "x"*address_size)
-    else:
-      msg.address="amqp://127.0.0.1:%d" % self.port
-    msg.reply_to = "~"
-    msg.subject="Hello World!"
-    body = "First the world, then the galaxy!"
-    if size is not None:
-      while len(body) < size:
-        body = 2*body
-      body = body[:size]
-    msg.body = body
-    self.client.put(msg)
-    self.client.send()
-
-    reply = Message()
-    self.client.recv(1)
-    assert self.client.incoming == 1, self.client.incoming
-    self.client.get(reply)
-
-    assert reply.subject == "Hello World!"
-    rbod = reply.body
-    assert rbod == body, (rbod, body)
-
-  def testSendReceive1K(self):
-    self.testSendReceive(1024)
-
-  def testSendReceive2K(self):
-    self.testSendReceive(2*1024)
-
-  def testSendReceive4K(self):
-    self.testSendReceive(4*1024)
-
-  def testSendReceive10K(self):
-    self.testSendReceive(10*1024)
-
-  def testSendReceive100K(self):
-    self.testSendReceive(100*1024)
-
-  def testSendReceive1M(self):
-    self.testSendReceive(1024*1024)
-
-  def testSendReceiveLargeAddress(self):
-    self.testSendReceive(address_size=2048)
-
-  # PROTON-285 - prevent continually failing test
-  def xtestSendBogus(self):
-    self.start()
-    msg = Message()
-    msg.address="totally-bogus-address"
-    try:
-      self.client.put(msg)
-      assert False, "Expecting MessengerException"
-    except MessengerException:
-      exc = sys.exc_info()[1]
-      err = str(exc)
-      assert "unable to send to address: totally-bogus-address" in err, err
-
-  def testOutgoingWindow(self):
-    self.server.incoming_window = 10
-    self.start()
-    msg = Message()
-    msg.address="amqp://127.0.0.1:%d" % self.port
-    msg.subject="Hello World!"
-
-    trackers = []
-    for i in range(10):
-      trackers.append(self.client.put(msg))
-
-    self.client.send()
-
-    for t in trackers:
-      assert self.client.status(t) is None
-
-    # reduce outgoing_window to 5 and then try to send 10 messages
-    self.client.outgoing_window = 5
-
-    trackers = []
-    for i in range(10):
-      trackers.append(self.client.put(msg))
-
-    for i in range(5):
-      t = trackers[i]
-      assert self.client.status(t) is None, (t, self.client.status(t))
-
-    for i in range(5, 10):
-      t = trackers[i]
-      assert self.client.status(t) is PENDING, (t, self.client.status(t))
-
-    self.client.send()
-
-    for i in range(5):
-      t = trackers[i]
-      assert self.client.status(t) is None
-
-    for i in range(5, 10):
-      t = trackers[i]
-      assert self.client.status(t) is ACCEPTED
-
-  def testReject(self, process_incoming=None):
-    if process_incoming:
-      self.process_incoming = process_incoming
-    self.server.incoming_window = 10
-    self.start()
-    msg = Message()
-    msg.address="amqp://127.0.0.1:%d" % self.port
-    msg.subject="Hello World!"
-
-    self.client.outgoing_window = 10
-    trackers = []
-    rejected = []
-    for i in range(10):
-      if i == 5:
-        msg.body = REJECT_ME
-      else:
-        msg.body = "Yay!"
-      trackers.append(self.client.put(msg))
-      if msg.body == REJECT_ME:
-        rejected.append(trackers[-1])
-
-    self.client.send()
-
-    for t in trackers:
-      if t in rejected:
-        assert self.client.status(t) is REJECTED, (t, self.client.status(t))
-      else:
-        assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
-
-  def testRejectIndividual(self):
-    self.testReject(self.reject_individual)
-
-  def reject_individual(self, msg):
-    if self.server.incoming < 10:
-      self.server.work(0)
-      return
-    while self.server.incoming:
-      t = self.server.get(msg)
-      if msg.body == REJECT_ME:
-        self.server.reject(t)
-      self.dispatch(msg)
-    self.server.accept()
-
-
-  def testIncomingWindow(self):
-    self.server.incoming_window = 10
-    self.server.outgoing_window = 10
-    self.start()
-    msg = Message()
-    msg.address="amqp://127.0.0.1:%d" % self.port
-    msg.reply_to = "~"
-    msg.subject="Hello World!"
-
-    self.client.outgoing_window = 10
-    trackers = []
-    for i in range(10):
-      trackers.append(self.client.put(msg))
-
-    self.client.send()
-
-    for t in trackers:
-      assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
-
-    self.client.incoming_window = 10
-    remaining = 10
-
-    trackers = []
-    while remaining:
-      self.client.recv(remaining)
-      while self.client.incoming:
-        t = self.client.get()
-        trackers.append(t)
-        self.client.accept(t)
-        remaining -= 1
-    for t in trackers:
-      assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
-
-  def testIncomingQueueBiggerThanWindow(self, size=10):
-    self.server.outgoing_window = size
-    self.client.incoming_window = size
-    self.start()
-
-    msg = Message()
-    msg.address = "amqp://127.0.0.1:%d" % self.port
-    msg.reply_to = "~"
-    msg.subject = "Hello World!"
-
-    for i in range(2*size):
-      self.client.put(msg)
-
-    trackers = []
-    while len(trackers) < 2*size:
-      self.client.recv(2*size - len(trackers))
-      while self.client.incoming:
-        t = self.client.get(msg)
-        assert self.client.status(t) is SETTLED, (t, self.client.status(t))
-        trackers.append(t)
-
-    for t in trackers[:size]:
-      assert self.client.status(t) is None, (t, self.client.status(t))
-    for t in trackers[size:]:
-      assert self.client.status(t) is SETTLED, (t, self.client.status(t))
-
-    self.client.accept()
-
-    for t in trackers[:size]:
-      assert self.client.status(t) is None, (t, self.client.status(t))
-    for t in trackers[size:]:
-      assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
-
-  def testIncomingQueueBiggerThanSessionWindow(self):
-    self.testIncomingQueueBiggerThanWindow(2048)
-
-  def testBuffered(self):
-    self.client.outgoing_window = 1000
-    self.client.incoming_window = 1000
-    self.start();
-    assert self.server_received == 0
-    buffering = 0
-    count = 100
-    for i in range(count):
-      msg = Message()
-      msg.address="amqp://127.0.0.1:%d" % self.port
-      msg.subject="Hello World!"
-      msg.body = "First the world, then the galaxy!"
-      t = self.client.put(msg)
-      buffered = self.client.buffered(t)
-      # allow transition from False to True, but not back
-      if buffered:
-          buffering += 1
-      else:
-        assert not buffering, ("saw %s buffered deliveries before?" % 
buffering)
-
-    while self.client.outgoing:
-        last = self.client.outgoing
-        self.client.send()
-        #print "sent ", last - self.client.outgoing
-
-    assert self.server_received == count
-
-  def test_proton222(self):
-    self.start()
-    msg = Message()
-    msg.address="amqp://127.0.0.1:%d" % self.port
-    msg.subject="Hello World!"
-    msg.body = "First the world, then the galaxy!"
-    assert self.server_received == 0
-    self.client.put(msg)
-    self.client.send()
-    # ensure the server got the message without requiring client to stop first
-    deadline = time() + 10
-    while self.server_received == 0:
-      assert time() < deadline, "Server did not receive message!"
-      sleep(.1)
-    assert self.server_received == 1
-
-  def testUnlimitedCredit(self):
-    """ Bring up two links.  Verify credit is granted to each link by
-    transferring a message over each.
-    """
-    self.server_credit = -1
-    self.start()
-
-    msg = Message()
-    msg.address="amqp://127.0.0.1:%d/XXX" % self.port
-    msg.reply_to = "~"
-    msg.subject="Hello World!"
-    body = "First the world, then the galaxy!"
-    msg.body = body
-    self.client.put(msg)
-    self.client.send()
-
-    reply = Message()
-    self.client.recv(1)
-    assert self.client.incoming == 1
-    self.client.get(reply)
-
-    assert reply.subject == "Hello World!"
-    rbod = reply.body
-    assert rbod == body, (rbod, body)
-
-    msg = Message()
-    msg.address="amqp://127.0.0.1:%d/YYY" % self.port
-    msg.reply_to = "~"
-    msg.subject="Hello World!"
-    body = "First the world, then the galaxy!"
-    msg.body = body
-    self.client.put(msg)
-    self.client.send()
-
-    reply = Message()
-    self.client.recv(1)
-    assert self.client.incoming == 1
-    self.client.get(reply)
-
-    assert reply.subject == "Hello World!"
-    rbod = reply.body
-    assert rbod == body, (rbod, body)
-
-  def _DISABLE_test_proton268(self):
-    """ Reproducer for JIRA Proton-268 """
-    """ DISABLED: Causes failure on Jenkins, appears to be unrelated to fix """
-    self.server_credit = 2048
-    self.start()
-
-    msg = Message()
-    msg.address="amqp://127.0.0.1:%d" % self.port
-    msg.body = "X" * 1024
-
-    for x in range( 100 ):
-      self.client.put( msg )
-    self.client.send()
-
-    try:
-      self.client.stop()
-    except Timeout:
-      assert False, "Timeout waiting for client stop()"
-
-    # need to restart client, as tearDown() uses it to stop server
-    self.client.start()
-
-  def testRoute(self):
-    # anonymous cipher not supported on Windows
-    if os.name == "nt" or not common.isSSLPresent():
-        domain = "amqp"
-    else:
-        domain = "amqps"
-    port = common.free_tcp_port()
-    self.server.subscribe(domain + "://~0.0.0.0:%d" % port)
-    self.start()
-    self.client.route("route1", "amqp://127.0.0.1:%d" % self.port)
-    self.client.route("route2", domain + "://127.0.0.1:%d" % port)
-
-    msg = Message()
-    msg.address = "route1"
-    msg.reply_to = "~"
-    msg.body = "test"
-    self.client.put(msg)
-    self.client.recv(1)
-
-    reply = Message()
-    self.client.get(reply)
-
-    msg = Message()
-    msg.address = "route2"
-    msg.reply_to = "~"
-    msg.body = "test"
-    self.client.put(msg)
-    self.client.recv(1)
-
-    self.client.get(reply)
-    assert reply.body == "test"
-
-  def testDefaultRoute(self):
-    self.start()
-    self.client.route("*", "amqp://127.0.0.1:%d" % self.port)
-
-    msg = Message()
-    msg.address = "asdf"
-    msg.body = "test"
-    msg.reply_to = "~"
-
-    self.client.put(msg)
-    self.client.recv(1)
-
-    reply = Message()
-    self.client.get(reply)
-    assert reply.body == "test"
-
-  def testDefaultRouteSubstitution(self):
-    self.start()
-    self.client.route("*", "amqp://127.0.0.1:%d/$1" % self.port)
-
-    msg = Message()
-    msg.address = "asdf"
-    msg.body = "test"
-    msg.reply_to = "~"
-
-    self.client.put(msg)
-    self.client.recv(1)
-
-    reply = Message()
-    self.client.get(reply)
-    assert reply.body == "test"
-
-  def testIncomingRoute(self):
-    self.start()
-    port = common.free_tcp_port()
-    self.client.route("in", "amqp://~0.0.0.0:%d" % port)
-    self.client.subscribe("in")
-
-    msg = Message()
-    msg.address = "amqp://127.0.0.1:%d" %self.port
-    msg.reply_to = "amqp://127.0.0.1:%d" % port
-    msg.body = "test"
-
-    self.client.put(msg)
-    self.client.recv(1)
-    reply = Message()
-    self.client.get(reply)
-    assert reply.body == "test"
-
-  def echo_address(self, msg):
-    while self.server.incoming:
-      self.server.get(msg)
-      msg.body = msg.address
-      self.dispatch(msg)
-
-  def _testRewrite(self, original, rewritten):
-    self.start()
-    self.process_incoming = self.echo_address
-    self.client.route("*", "amqp://127.0.0.1:%d" % self.port)
-
-    msg = Message()
-    msg.address = original
-    msg.body = "test"
-    msg.reply_to = "~"
-
-    self.client.put(msg)
-    assert msg.address == original
-    self.client.recv(1)
-    assert self.client.incoming == 1
-
-    echo = Message()
-    self.client.get(echo)
-    assert echo.body == rewritten, (echo.body, rewritten)
-    assert msg.address == original
-
-  def testDefaultRewriteH(self):
-    self._testRewrite("original", "original")
-
-  def testDefaultRewriteUH(self):
-    self._testRewrite("user@original", "original")
-
-  def testDefaultRewriteUPH(self):
-    self._testRewrite("user:pass@original", "original")
-
-  def testDefaultRewriteHP(self):
-    self._testRewrite("original:123", "original:123")
-
-  def testDefaultRewriteUHP(self):
-    self._testRewrite("user@original:123", "original:123")
-
-  def testDefaultRewriteUPHP(self):
-    self._testRewrite("user:pass@original:123", "original:123")
-
-  def testDefaultRewriteHN(self):
-    self._testRewrite("original/name", "original/name")
-
-  def testDefaultRewriteUHN(self):
-    self._testRewrite("user@original/name", "original/name")
-
-  def testDefaultRewriteUPHN(self):
-    self._testRewrite("user:pass@original/name", "original/name")
-
-  def testDefaultRewriteHPN(self):
-    self._testRewrite("original:123/name", "original:123/name")
-
-  def testDefaultRewriteUHPN(self):
-    self._testRewrite("user@original:123/name", "original:123/name")
-
-  def testDefaultRewriteUPHPN(self):
-    self._testRewrite("user:pass@original:123/name", "original:123/name")
-
-  def testDefaultRewriteSH(self):
-    self._testRewrite("amqp://original", "amqp://original")
-
-  def testDefaultRewriteSUH(self):
-    self._testRewrite("amqp://user@original", "amqp://original")
-
-  def testDefaultRewriteSUPH(self):
-    self._testRewrite("amqp://user:pass@original", "amqp://original")
-
-  def testDefaultRewriteSHP(self):
-    self._testRewrite("amqp://original:123", "amqp://original:123")
-
-  def testDefaultRewriteSUHP(self):
-    self._testRewrite("amqp://user@original:123", "amqp://original:123")
-
-  def testDefaultRewriteSUPHP(self):
-    self._testRewrite("amqp://user:pass@original:123", "amqp://original:123")
-
-  def testDefaultRewriteSHN(self):
-    self._testRewrite("amqp://original/name", "amqp://original/name")
-
-  def testDefaultRewriteSUHN(self):
-    self._testRewrite("amqp://user@original/name", "amqp://original/name")
-
-  def testDefaultRewriteSUPHN(self):
-    self._testRewrite("amqp://user:pass@original/name", "amqp://original/name")
-
-  def testDefaultRewriteSHPN(self):
-    self._testRewrite("amqp://original:123/name", "amqp://original:123/name")
-
-  def testDefaultRewriteSUHPN(self):
-    self._testRewrite("amqp://user@original:123/name", 
"amqp://original:123/name")
-
-  def testDefaultRewriteSUPHPN(self):
-    self._testRewrite("amqp://user:pass@original:123/name", 
"amqp://original:123/name")
-
-  def testRewriteSupress(self):
-    self.client.rewrite("*", None)
-    self._testRewrite("asdf", None)
-
-  def testRewrite(self):
-    self.client.rewrite("a", "b")
-    self._testRewrite("a", "b")
-
-  def testRewritePattern(self):
-    self.client.rewrite("amqp://%@*", "amqp://$2")
-    self._testRewrite("amqp://foo@bar", "amqp://bar")
-
-  def testRewriteToAt(self):
-    self.client.rewrite("amqp://%/*", "$2@$1")
-    self._testRewrite("amqp://domain/name", "name@domain")
-
-  def testRewriteOverrideDefault(self):
-    self.client.rewrite("*", "$1")
-    self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host")
-
-  def testCreditBlockingRebalance(self):
-    """ The server is given a fixed amount of credit, and runs until that
-    credit is exhausted.
-    """
-    self.server_finite_credit = True
-    self.server_credit = 11
-    self.start()
-
-    # put one message out on "Link1" - since there are no other links, it
-    # should get all the credit (10 after sending)
-    msg = Message()
-    msg.address="amqp://127.0.0.1:%d/Link1" % self.port
-    msg.subject="Hello World!"
-    body = "First the world, then the galaxy!"
-    msg.body = body
-    msg.reply_to = "~"
-    self.client.put(msg)
-    self.client.send()
-    self.client.recv(1)
-    assert self.client.incoming == 1
-
-    # Now attempt to exhaust credit using a different link
-    for i in range(10):
-      msg.address="amqp://127.0.0.1:%d/Link2" % self.port
-      self.client.put(msg)
-    self.client.send()
-
-    deadline = time() + self.timeout
-    count = 0
-    while count < 11 and time() < deadline:
-        self.client.recv(-1)
-        while self.client.incoming:
-            self.client.get(msg)
-            count += 1
-    assert count == 11, count
-
-    # now attempt to send one more.  There isn't enough credit, so it should
-    # not be sent
-    self.client.timeout = 1
-    msg.address="amqp://127.0.0.1:%d/Link2" % self.port
-    self.client.put(msg)
-    try:
-      self.client.send()
-      assert False, "expected client to time out in send()"
-    except Timeout:
-      pass
-    assert self.client.outgoing == 1
-
-
-class NBMessengerTest(common.Test):
-
-  def setUp(self):
-    self.client = Messenger("client")
-    self.client2 = Messenger("client2")
-    self.server = Messenger("server")
-    self.messengers = [self.client, self.client2, self.server]
-    self.client.blocking = False
-    self.client2.blocking = False
-    self.server.blocking = False
-    self.server.start()
-    self.client.start()
-    self.client2.start()
-    port = common.free_tcp_port()
-    self.address = "amqp://127.0.0.1:%d" % port
-    self.server.subscribe("amqp://~0.0.0.0:%d" % port)
-
-  def _pump(self, timeout, work_triggers_exit):
-    for msgr in self.messengers:
-      if msgr.work(timeout) and work_triggers_exit:
-        return True
-    return False
-
-  def pump(self, timeout=0):
-    while self._pump(0, True): pass
-    self._pump(timeout, False)
-    while self._pump(0, True): pass
-
-  def tearDown(self):
-    self.server.stop()
-    self.client.stop()
-    self.client2.stop()
-    self.pump()
-    assert self.server.stopped
-    assert self.client.stopped
-    assert self.client2.stopped
-
-  def testSmoke(self, count=1):
-    self.server.recv()
-
-    msg = Message()
-    msg.address = self.address
-    for i in range(count):
-      msg.body = "Hello %s" % i
-      self.client.put(msg)
-
-    msg2 = Message()
-    for i in range(count):
-      if self.server.incoming == 0:
-        self.pump()
-      assert self.server.incoming > 0, self.server.incoming
-      self.server.get(msg2)
-      assert msg2.body == "Hello %s" % i, (msg2.body, i)
-
-    assert self.client.outgoing == 0, self.client.outgoing
-    assert self.server.incoming == 0, self.client.incoming
-
-  def testSmoke1024(self):
-    self.testSmoke(1024)
-
-  def testSmoke4096(self):
-    self.testSmoke(4096)
-
-  def testPushback(self):
-    self.server.recv()
-
-    msg = Message()
-    msg.address = self.address
-    for i in range(16):
-      for i in range(1024):
-        self.client.put(msg)
-      self.pump()
-      if self.client.outgoing > 0:
-        break
-
-    assert self.client.outgoing > 0
-
-  def testRecvBeforeSubscribe(self):
-    self.client.recv()
-    self.client.subscribe(self.address + "/foo")
-
-    self.pump()
-
-    msg = Message()
-    msg.address = "amqp://client/foo"
-    msg.body = "Hello World!"
-    self.server.put(msg)
-
-    assert self.client.incoming == 0
-    self.pump(self.delay)
-    assert self.client.incoming == 1
-
-    msg2 = Message()
-    self.client.get(msg2)
-    assert msg2.address == msg.address
-    assert msg2.body == msg.body
-
-  def testCreditAutoBackpressure(self):
-    """ Verify that use of automatic credit (pn_messenger_recv(-1)) does not
-    fill the incoming queue indefinitely.  If the receiver does not 'get' the
-    message, eventually the sender will block.  See PROTON-350 """
-    self.server.recv()
-    msg = Message()
-    msg.address = self.address
-    deadline = time() + self.timeout
-    while time() < deadline:
-        old = self.server.incoming
-        for j in range(1001):
-            self.client.put(msg)
-        self.pump()
-        if old == self.server.incoming:
-            break;
-    assert old == self.server.incoming, "Backpressure not active!"
-
-  def testCreditRedistribution(self):
-    """ Verify that a fixed amount of credit will redistribute to new
-    links.
-    """
-    self.server.recv( 5 )
-
-    # first link will get all credit
-    msg1 = Message()
-    msg1.address = self.address + "/msg1"
-    self.client.put(msg1)
-    self.pump()
-    assert self.server.incoming == 1, self.server.incoming
-    assert self.server.receiving == 4, self.server.receiving
-
-    # no credit left over for this link
-    msg2 = Message()
-    msg2.address = self.address + "/msg2"
-    self.client.put(msg2)
-    self.pump()
-    assert self.server.incoming == 1, self.server.incoming
-    assert self.server.receiving == 4, self.server.receiving
-
-    # eventually, credit will rebalance and the new link will send
-    deadline = time() + self.timeout
-    while time() < deadline:
-        sleep(.1)
-        self.pump()
-        if self.server.incoming == 2:
-            break;
-    assert self.server.incoming == 2, self.server.incoming
-    assert self.server.receiving == 3, self.server.receiving
-
-  def testCreditReclaim(self):
-    """ Verify that credit is reclaimed when a link with outstanding credit is
-    torn down.
-    """
-    self.server.recv( 9 )
-
-    # first link will get all credit
-    msg1 = Message()
-    msg1.address = self.address + "/msg1"
-    self.client.put(msg1)
-    self.pump()
-    assert self.server.incoming == 1, self.server.incoming
-    assert self.server.receiving == 8, self.server.receiving
-
-    # no credit left over for this link
-    msg2 = Message()
-    msg2.address = self.address + "/msg2"
-    self.client.put(msg2)
-    self.pump()
-    assert self.server.incoming == 1, self.server.incoming
-    assert self.server.receiving == 8, self.server.receiving
-
-    # and none for this new client
-    msg3 = Message()
-    msg3.address = self.address + "/msg3"
-    self.client2.put(msg3)
-    self.pump()
-
-    # eventually, credit will rebalance and all links will
-    # send a message
-    deadline = time() + self.timeout
-    while time() < deadline:
-        sleep(.1)
-        self.pump()
-        if self.server.incoming == 3:
-            break;
-    assert self.server.incoming == 3, self.server.incoming
-    assert self.server.receiving == 6, self.server.receiving
-
-    # now tear down client two, this should cause its outstanding credit to be
-    # made available to the other links
-    self.client2.stop()
-    self.pump()
-
-    for i in range(4):
-        self.client.put(msg1)
-        self.client.put(msg2)
-
-    # should exhaust all credit
-    deadline = time() + self.timeout
-    while time() < deadline:
-        sleep(.1)
-        self.pump()
-        if self.server.incoming == 9:
-            break;
-    assert self.server.incoming == 9, self.server.incoming
-    assert self.server.receiving == 0, self.server.receiving
-
-  def testCreditReplenish(self):
-    """ When extra credit is available it should be granted to the first
-    link that can use it.
-    """
-    # create three links
-    msg = Message()
-    for i in range(3):
-        msg.address = self.address + "/%d" % i
-        self.client.put(msg)
-
-    self.server.recv( 50 )  # 50/3 = 16 per link + 2 extra
-
-    self.pump()
-    assert self.server.incoming == 3, self.server.incoming
-    assert self.server.receiving == 47, self.server.receiving
-
-    # 47/3 = 15 per link, + 2 extra
-
-    # verify one link can send 15 + the two extra (17)
-    for i in range(17):
-        msg.address = self.address + "/0"
-        self.client.put(msg)
-    self.pump()
-    assert self.server.incoming == 20, self.server.incoming
-    assert self.server.receiving == 30, self.server.receiving
-
-    # now verify that the remaining credit (30) will eventually rebalance
-    # across all links (10 per link)
-    for j in range(10):
-        for i in range(3):
-            msg.address = self.address + "/%d" % i
-            self.client.put(msg)
-
-    deadline = time() + self.timeout
-    while time() < deadline:
-        sleep(.1)
-        self.pump()
-        if self.server.incoming == 50:
-            break
-    assert self.server.incoming == 50, self.server.incoming
-    assert self.server.receiving == 0, self.server.receiving
-
-from select import select
-
-class Pump:
-
-  def __init__(self, *messengers):
-    self.messengers = messengers
-    self.selectables = []
-
-  def pump_once(self):
-    for m in self.messengers:
-      while True:
-        sel = m.selectable()
-        if sel:
-          self.selectables.append(sel)
-        else:
-          break
-
-    reading = []
-    writing = []
-
-    for sel in self.selectables[:]:
-      if sel.is_terminal:
-        sel.release()
-        self.selectables.remove(sel)
-      else:
-        if sel.reading:
-          reading.append(sel)
-        if sel.writing:
-          writing.append(sel)
-
-    readable, writable, _ = select(reading, writing, [], 0.1)
-
-    count = 0
-    for s in readable:
-      s.readable()
-      count += 1
-    for s in writable:
-      s.writable()
-      count += 1
-    return count
-
-  def pump(self):
-    while self.pump_once(): pass
-
-class SelectableMessengerTest(common.Test):
-
-  def testSelectable(self, count = 1):
-    if os.name=="nt":
-      # Conflict between native OS select() in Pump and IOCP based 
pn_selector_t
-      # makes this fail on Windows (see PROTON-668).
-      raise Skipped("Invalid test on Windows with IOCP.")
-
-    mrcv = Messenger()
-    mrcv.passive = True
-    port = common.free_tcp_port()
-    mrcv.subscribe("amqp://~0.0.0.0:%d" % port)
-
-    msnd = Messenger()
-    msnd.passive = True
-    m = Message()
-    m.address = "amqp://127.0.0.1:%d" % port
-
-    for i in range(count):
-      m.body = u"Hello World! %s" % i
-      msnd.put(m)
-
-    p = Pump(msnd, mrcv)
-    p.pump()
-
-    assert msnd.outgoing == count
-    assert mrcv.incoming == 0
-
-    mrcv.recv()
-
-    mc = Message()
-
-    try:
-      for i in range(count):
-        while mrcv.incoming == 0:
-          p.pump()
-        assert mrcv.incoming > 0, (count, msnd.outgoing, mrcv.incoming)
-        mrcv.get(mc)
-        assert mc.body == u"Hello World! %s" % i, (i, mc.body)
-    finally:
-      mrcv.stop()
-      msnd.stop()
-      assert not mrcv.stopped
-      assert not msnd.stopped
-      p.pump()
-      assert mrcv.stopped
-      assert msnd.stopped
-
-  def testSelectable16(self):
-    self.testSelectable(count=16)
-
-  def testSelectable1024(self):
-    self.testSelectable(count=1024)
-
-  def testSelectable4096(self):
-    self.testSelectable(count=4096)
-
-
-class IdleTimeoutTest(common.Test):
-
-  def testIdleTimeout(self):
-    """
-    Verify that a Messenger connection is kept alive using empty idle frames
-    when a idle_timeout is advertised by the remote peer.
-    """
-    if "java" in sys.platform:
-      raise Skipped()
-    idle_timeout_secs = self.delay
-
-    try:
-      idle_server = common.TestServer(idle_timeout=idle_timeout_secs)
-      idle_server.timeout = self.timeout
-      idle_server.start()
-
-      idle_client = Messenger("idle_client")
-      idle_client.timeout = self.timeout
-      idle_client.start()
-
-      idle_client.subscribe("amqp://%s:%s/foo" %
-                            (idle_server.host, idle_server.port))
-      idle_client.work(idle_timeout_secs/10)
-
-      # wait up to 3x the idle timeout and hence verify that everything stays
-      # connected during that time by virtue of no Exception being raised
-      duration = 3 * idle_timeout_secs
-      deadline = time() + duration
-      while time() <= deadline:
-        idle_client.work(idle_timeout_secs/10)
-        continue
-
-      # confirm link is still active
-      assert not idle_server.conditions, idle_server.conditions
-    finally:
-      try:
-        idle_client.stop()
-      except:
-        pass
-      try:
-        idle_server.stop()
-      except:
-        pass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to