http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java new file mode 100644 index 0000000..6b2da05 --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.reactor.impl; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class AddressTest { + + @SuppressWarnings("deprecation") + private void testParse(String url, String scheme, String user, String pass, String host, String port, String name) + { + Address address = new Address(url); + assertEquals(scheme, address.getScheme()); + assertEquals(user, address.getUser()); + assertEquals(pass, address.getPass()); + assertEquals(host, address.getHost()); + assertEquals(port, address.getPort()); + assertEquals(url, address.toString()); + } + + @Test + public void addressTests() + { + testParse("host", null, null, null, "host", null, null); + testParse("host:423", null, null, null, "host", "423", null); + testParse("user@host", null, "user", null, "host", null, null); + testParse("user:1243^&^:pw@host:423", null, "user", "1243^&^:pw", "host", "423", null); + testParse("user:1243^&^:pw@host:423/Foo.bar:90087", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087"); + testParse("user:1243^&^:pw@host:423/Foo.bar:90087@somewhere", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087@somewhere"); + testParse("[::1]", null, null, null, "::1", null, null); + testParse("[::1]:amqp", null, null, null, "::1", "amqp", null); + testParse("user@[::1]", null, "user", null, "::1", null, null); + testParse("user@[::1]:amqp", null, "user", null, "::1", "amqp", null); + testParse("user:1243^&^:pw@[::1]:amqp", null, "user", "1243^&^:pw", "::1", "amqp", null); + testParse("user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"); + testParse("user:1243^&^:pw@[::1:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "[", ":1:amqp", "Foo.bar:90087"); + testParse("user:1243^&^:pw@::1]:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "", ":1]:amqp", "Foo.bar:90087"); + testParse("amqp://user@[::1]", "amqp", "user", null, "::1", null, null); + testParse("amqp://user@[::1]:amqp", "amqp", "user", null, "::1", "amqp", null); + testParse("amqp://user@[1234:52:0:1260:f2de:f1ff:fe59:8f87]:amqp", "amqp", "user", null, "1234:52:0:1260:f2de:f1ff:fe59:8f87", "amqp", null); + testParse("amqp://user:1243^&^:pw@[::1]:amqp", "amqp", "user", "1243^&^:pw", "::1", "amqp", null); + testParse("amqp://user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", "amqp", "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"); + testParse("amqp://host", "amqp", null, null, "host", null, null); + testParse("amqp://user@host", "amqp", "user", null, "host", null, null); + testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%"); + testParse("amqp://user@host:5674/path:%", "amqp", "user", null, "host", "5674", "path:%"); + testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%"); + testParse("amqp://bigbird@host/queue@host", "amqp", "bigbird", null, "host", null, "queue@host"); + testParse("amqp://host/queue@host", "amqp", null, null, "host", null, "queue@host"); + testParse("amqp://host:9765/queue@host", "amqp", null, null, "host", "9765", "queue@host"); + } +}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/pythonTests.ignore ---------------------------------------------------------------------- diff --git a/tests/java/pythonTests.ignore b/tests/java/pythonTests.ignore index 7911176..1fafa02 100644 --- a/tests/java/pythonTests.ignore +++ b/tests/java/pythonTests.ignore @@ -1,4 +1 @@ proton_tests.reactor_interop.* -proton_tests.soak.* -proton_tests.ssl.SslTest.test_defaults_messenger_app -proton_tests.ssl.SslTest.test_server_authentication_messenger_app http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/shim/binding/proton/__init__.py ---------------------------------------------------------------------- diff --git a/tests/java/shim/binding/proton/__init__.py b/tests/java/shim/binding/proton/__init__.py index d3f6922..ecb480f 100644 --- a/tests/java/shim/binding/proton/__init__.py +++ b/tests/java/shim/binding/proton/__init__.py @@ -23,7 +23,6 @@ protocol. The proton APIs consist of the following classes: - - L{Messenger} -- A messaging endpoint. - L{Message} -- A class for creating and/or accessing AMQP message content. - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded data. @@ -148,13 +147,6 @@ class Interrupt(ProtonException): """ pass -class MessengerException(ProtonException): - """ - The root of the messenger exception hierarchy. All exceptions - generated by the messenger class derive from this exception. - """ - pass - class MessageException(ProtonException): """ The MessageException class is the root of the message exception @@ -168,618 +160,10 @@ EXCEPTIONS = { PN_INTR: Interrupt } -PENDING = Constant("PENDING") -ACCEPTED = Constant("ACCEPTED") -REJECTED = Constant("REJECTED") -RELEASED = Constant("RELEASED") -MODIFIED = Constant("MODIFIED") -ABORTED = Constant("ABORTED") -SETTLED = Constant("SETTLED") - -STATUSES = { - PN_STATUS_ABORTED: ABORTED, - PN_STATUS_ACCEPTED: ACCEPTED, - PN_STATUS_REJECTED: REJECTED, - PN_STATUS_RELEASED: RELEASED, - PN_STATUS_MODIFIED: MODIFIED, - PN_STATUS_PENDING: PENDING, - PN_STATUS_SETTLED: SETTLED, - PN_STATUS_UNKNOWN: None - } AUTOMATIC = Constant("AUTOMATIC") MANUAL = Constant("MANUAL") -class Messenger(object): - """ - The L{Messenger} class defines a high level interface for sending - and receiving L{Messages<Message>}. Every L{Messenger} contains a - single logical queue of incoming messages and a single logical queue - of outgoing messages. These messages in these queues may be destined - for, or originate from, a variety of addresses. - - The messenger interface is single-threaded. All methods - except one (L{interrupt}) are intended to be used from within - the messenger thread. - - - Address Syntax - ============== - - An address has the following form:: - - [ amqp[s]:// ] [user[:password]@] domain [/[name]] - - Where domain can be one of:: - - host | host:port | ip | ip:port | name - - The following are valid examples of addresses: - - - example.org - - example.org:1234 - - amqp://example.org - - amqps://example.org - - example.org/incoming - - amqps://example.org/outgoing - - amqps://fred:[email protected] - - 127.0.0.1:1234 - - amqps://127.0.0.1:1234 - - Sending & Receiving Messages - ============================ - - The L{Messenger} class works in conjuction with the L{Message} class. The - L{Message} class is a mutable holder of message content. - - The L{put} method copies its L{Message} to the outgoing queue, and may - send queued messages if it can do so without blocking. The L{send} - method blocks until it has sent the requested number of messages, - or until a timeout interrupts the attempt. - - - >>> message = Message() - >>> for i in range(3): - ... message.address = "amqp://host/queue" - ... message.subject = "Hello World %i" % i - ... messenger.put(message) - >>> messenger.send() - - Similarly, the L{recv} method receives messages into the incoming - queue, and may block as it attempts to receive the requested number - of messages, or until timeout is reached. It may receive fewer - than the requested number. The L{get} method pops the - eldest L{Message} off the incoming queue and copies it into the L{Message} - object that you supply. It will not block. - - - >>> message = Message() - >>> messenger.recv(10): - >>> while messenger.incoming > 0: - ... messenger.get(message) - ... print message.subject - Hello World 0 - Hello World 1 - Hello World 2 - - The blocking flag allows you to turn off blocking behavior entirely, - in which case L{send} and L{recv} will do whatever they can without - blocking, and then return. You can then look at the number - of incoming and outgoing messages to see how much outstanding work - still remains. - """ - - def __init__(self, name=None): - """ - Construct a new L{Messenger} with the given name. The name has - global scope. If a NULL name is supplied, a UUID based name will - be chosen. - - @type name: string - @param name: the name of the messenger or None - - """ - self._mng = pn_messenger(name) - self._selectables = {} - - def __del__(self): - """ - Destroy the L{Messenger}. This will close all connections that - are managed by the L{Messenger}. Call the L{stop} method before - destroying the L{Messenger}. - """ - if hasattr(self, "_mng"): - pn_messenger_free(self._mng) - del self._mng - - def _check(self, err): - if err < 0: - if (err == PN_INPROGRESS): - return - exc = EXCEPTIONS.get(err, MessengerException) - raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) - else: - return err - - @property - def name(self): - """ - The name of the L{Messenger}. - """ - return pn_messenger_name(self._mng) - - def _get_certificate(self): - return pn_messenger_get_certificate(self._mng) - - def _set_certificate(self, value): - self._check(pn_messenger_set_certificate(self._mng, value)) - - certificate = property(_get_certificate, _set_certificate, - doc=""" -Path to a certificate file for the L{Messenger}. This certificate is -used when the L{Messenger} accepts or establishes SSL/TLS connections. -This property must be specified for the L{Messenger} to accept -incoming SSL/TLS connections and to establish client authenticated -outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS -connections do not require this property. -""") - - def _get_private_key(self): - return pn_messenger_get_private_key(self._mng) - - def _set_private_key(self, value): - self._check(pn_messenger_set_private_key(self._mng, value)) - - private_key = property(_get_private_key, _set_private_key, - doc=""" -Path to a private key file for the L{Messenger's<Messenger>} -certificate. This property must be specified for the L{Messenger} to -accept incoming SSL/TLS connections and to establish client -authenticated outgoing SSL/TLS connection. Non client authenticated -SSL/TLS connections do not require this property. -""") - - def _get_password(self): - return pn_messenger_get_password(self._mng) - - def _set_password(self, value): - self._check(pn_messenger_set_password(self._mng, value)) - - password = property(_get_password, _set_password, - doc=""" -This property contains the password for the L{Messenger.private_key} -file, or None if the file is not encrypted. -""") - - def _get_trusted_certificates(self): - return pn_messenger_get_trusted_certificates(self._mng) - - def _set_trusted_certificates(self, value): - self._check(pn_messenger_set_trusted_certificates(self._mng, value)) - - trusted_certificates = property(_get_trusted_certificates, - _set_trusted_certificates, - doc=""" -A path to a database of trusted certificates for use in verifying the -peer on an SSL/TLS connection. If this property is None, then the peer -will not be verified. -""") - - def _get_timeout(self): - t = pn_messenger_get_timeout(self._mng) - if t == -1: - return None - else: - return millis2secs(t) - - def _set_timeout(self, value): - if value is None: - t = -1 - else: - t = secs2millis(value) - self._check(pn_messenger_set_timeout(self._mng, t)) - - timeout = property(_get_timeout, _set_timeout, - doc=""" -The timeout property contains the default timeout for blocking -operations performed by the L{Messenger}. -""") - - def _is_blocking(self): - return pn_messenger_is_blocking(self._mng) - - def _set_blocking(self, b): - self._check(pn_messenger_set_blocking(self._mng, b)) - - blocking = property(_is_blocking, _set_blocking, - doc=""" -Enable or disable blocking behavior during L{Message} sending -and receiving. This affects every blocking call, with the -exception of L{work}. Currently, the affected calls are -L{send}, L{recv}, and L{stop}. -""") - - def _is_passive(self): - return pn_messenger_is_passive(self._mng) - - def _set_passive(self, b): - self._check(pn_messenger_set_passive(self._mng, b)) - - passive = property(_is_passive, _set_passive, - doc=""" -When passive is set to true, Messenger will not attempt to perform I/O -internally. In this mode it is necessary to use the selectables API to -drive any I/O needed to perform requested actions. In this mode -Messenger will never block. -""") - - def _get_incoming_window(self): - return pn_messenger_get_incoming_window(self._mng) - - def _set_incoming_window(self, window): - self._check(pn_messenger_set_incoming_window(self._mng, window)) - - incoming_window = property(_get_incoming_window, _set_incoming_window, - doc=""" -The incoming tracking window for the messenger. The messenger will -track the remote status of this many incoming deliveries after they -have been accepted or rejected. Defaults to zero. - -L{Messages<Message>} enter this window only when you take them into your application -using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} -without explicitly accepting or rejecting the oldest message, then the -message that passes beyond the edge of the incoming window will be assigned -the default disposition of its link. -""") - - def _get_outgoing_window(self): - return pn_messenger_get_outgoing_window(self._mng) - - def _set_outgoing_window(self, window): - self._check(pn_messenger_set_outgoing_window(self._mng, window)) - - outgoing_window = property(_get_outgoing_window, _set_outgoing_window, - doc=""" -The outgoing tracking window for the messenger. The messenger will -track the remote status of this many outgoing deliveries after calling -send. Defaults to zero. - -A L{Message} enters this window when you call the put() method with the -message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 -times, status information will no longer be available for the -first message. -""") - - def start(self): - """ - Currently a no-op placeholder. - For future compatibility, do not L{send} or L{recv} messages - before starting the L{Messenger}. - """ - self._check(pn_messenger_start(self._mng)) - - def stop(self): - """ - Transitions the L{Messenger} to an inactive state. An inactive - L{Messenger} will not send or receive messages from its internal - queues. A L{Messenger} should be stopped before being discarded to - ensure a clean shutdown handshake occurs on any internally managed - connections. - """ - self._check(pn_messenger_stop(self._mng)) - - @property - def stopped(self): - """ - Returns true iff a L{Messenger} is in the stopped state. - This function does not block. - """ - return pn_messenger_stopped(self._mng) - - def subscribe(self, source): - """ - Subscribes the L{Messenger} to messages originating from the - specified source. The source is an address as specified in the - L{Messenger} introduction with the following addition. If the - domain portion of the address begins with the '~' character, the - L{Messenger} will interpret the domain as host/port, bind to it, - and listen for incoming messages. For example "~0.0.0.0", - "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any - local interface and listen for incoming messages with the last - variant only permitting incoming SSL connections. - - @type source: string - @param source: the source of messages to subscribe to - """ - sub_impl = pn_messenger_subscribe(self._mng, source) - if not sub_impl: - self._check(pn_error_code(pn_messenger_error(self._mng))) - raise MessengerException("Cannot subscribe to %s"%source) - return Subscription(sub_impl) - - def put(self, message): - """ - Places the content contained in the message onto the outgoing - queue of the L{Messenger}. This method will never block, however - it will send any unblocked L{Messages<Message>} in the outgoing - queue immediately and leave any blocked L{Messages<Message>} - remaining in the outgoing queue. The L{send} call may be used to - block until the outgoing queue is empty. The L{outgoing} property - may be used to check the depth of the outgoing queue. - - When the content in a given L{Message} object is copied to the outgoing - message queue, you may then modify or discard the L{Message} object - without having any impact on the content in the outgoing queue. - - This method returns an outgoing tracker for the L{Message}. The tracker - can be used to determine the delivery status of the L{Message}. - - @type message: Message - @param message: the message to place in the outgoing queue - @return: a tracker - """ - message._pre_encode() - self._check(pn_messenger_put(self._mng, message._msg)) - return pn_messenger_outgoing_tracker(self._mng) - - def status(self, tracker): - """ - Gets the last known remote state of the delivery associated with - the given tracker. - - @type tracker: tracker - @param tracker: the tracker whose status is to be retrieved - - @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED - """ - disp = pn_messenger_status(self._mng, tracker); - return STATUSES.get(disp, disp) - - def buffered(self, tracker): - """ - Checks if the delivery associated with the given tracker is still - waiting to be sent. - - @type tracker: tracker - @param tracker: the tracker whose status is to be retrieved - - @return: true if delivery is still buffered - """ - return pn_messenger_buffered(self._mng, tracker); - - def settle(self, tracker=None): - """ - Frees a L{Messenger} from tracking the status associated with a given - tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up - to the most recent will be settled. - """ - if tracker is None: - tracker = pn_messenger_outgoing_tracker(self._mng) - flags = PN_CUMULATIVE - else: - flags = 0 - self._check(pn_messenger_settle(self._mng, tracker, flags)) - - def send(self, n=-1): - """ - This call will block until the indicated number of L{messages<Message>} - have been sent, or until the operation times out. If n is -1 this call will - block until all outgoing L{messages<Message>} have been sent. If n is 0 then - this call will send whatever it can without blocking. - """ - self._check(pn_messenger_send(self._mng, n)) - - def recv(self, n=None): - """ - Receives up to I{n} L{messages<Message>} into the incoming queue. If no value - for I{n} is supplied, this call will receive as many L{messages<Message>} as it - can buffer internally. If the L{Messenger} is in blocking mode, this - call will block until at least one L{Message} is available in the - incoming queue. - """ - if n is None: - n = -1 - self._check(pn_messenger_recv(self._mng, n)) - - def work(self, timeout=None): - """ - Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. - This will block for the indicated timeout. - This method may also do I/O work other than sending and receiving - L{messages<Message>}. For example, closing connections after messenger.L{stop}() - has been called. - """ - if timeout is None: - t = -1 - else: - t = secs2millis(timeout) - err = pn_messenger_work(self._mng, t) - if (err == PN_TIMEOUT): - return False - else: - self._check(err) - return True - - @property - def receiving(self): - return pn_messenger_receiving(self._mng) - - def interrupt(self): - """ - The L{Messenger} interface is single-threaded. - This is the only L{Messenger} function intended to be called - from outside of the L{Messenger} thread. - Call this from a non-messenger thread to interrupt - a L{Messenger} that is blocking. - This will cause any in-progress blocking call to throw - the L{Interrupt} exception. If there is no currently blocking - call, then the next blocking call will be affected, even if it - is within the same thread that interrupt was called from. - """ - self._check(pn_messenger_interrupt(self._mng)) - - def get(self, message=None): - """ - Moves the message from the head of the incoming message queue into - the supplied message object. Any content in the message will be - overwritten. - - A tracker for the incoming L{Message} is returned. The tracker can - later be used to communicate your acceptance or rejection of the - L{Message}. - - If None is passed in for the L{Message} object, the L{Message} - popped from the head of the queue is discarded. - - @type message: Message - @param message: the destination message object - @return: a tracker - """ - if message is None: - impl = None - else: - impl = message._msg - self._check(pn_messenger_get(self._mng, impl)) - if message is not None: - message._post_decode() - return pn_messenger_incoming_tracker(self._mng) - - def accept(self, tracker=None): - """ - Signal the sender that you have acted on the L{Message} - pointed to by the tracker. If no tracker is supplied, - then all messages that have been returned by the L{get} - method are accepted, except those that have already been - auto-settled by passing beyond your incoming window size. - - @type tracker: tracker - @param tracker: a tracker as returned by get - """ - if tracker is None: - tracker = pn_messenger_incoming_tracker(self._mng) - flags = PN_CUMULATIVE - else: - flags = 0 - self._check(pn_messenger_accept(self._mng, tracker, flags)) - - def reject(self, tracker=None): - """ - Rejects the L{Message} indicated by the tracker. If no tracker - is supplied, all messages that have been returned by the L{get} - method are rejected, except those that have already been auto-settled - by passing beyond your outgoing window size. - - @type tracker: tracker - @param tracker: a tracker as returned by get - """ - if tracker is None: - tracker = pn_messenger_incoming_tracker(self._mng) - flags = PN_CUMULATIVE - else: - flags = 0 - self._check(pn_messenger_reject(self._mng, tracker, flags)) - - @property - def outgoing(self): - """ - The outgoing queue depth. - """ - return pn_messenger_outgoing(self._mng) - - @property - def incoming(self): - """ - The incoming queue depth. - """ - return pn_messenger_incoming(self._mng) - - def route(self, pattern, address): - """ - Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. - - The route procedure may be used to influence how a L{Messenger} will - internally treat a given address or class of addresses. Every call - to the route procedure will result in L{Messenger} appending a routing - rule to its internal routing table. - - Whenever a L{Message} is presented to a L{Messenger} for delivery, it - will match the address of this message against the set of routing - rules in order. The first rule to match will be triggered, and - instead of routing based on the address presented in the message, - the L{Messenger} will route based on the address supplied in the rule. - - The pattern matching syntax supports two types of matches, a '%' - will match any character except a '/', and a '*' will match any - character including a '/'. - - A routing address is specified as a normal AMQP address, however it - may additionally use substitution variables from the pattern match - that triggered the rule. - - Any message sent to "foo" will be routed to "amqp://foo.com": - - >>> messenger.route("foo", "amqp://foo.com"); - - Any message sent to "foobar" will be routed to - "amqp://foo.com/bar": - - >>> messenger.route("foobar", "amqp://foo.com/bar"); - - Any message sent to bar/<path> will be routed to the corresponding - path within the amqp://bar.com domain: - - >>> messenger.route("bar/*", "amqp://bar.com/$1"); - - Route all L{messages<Message>} over TLS: - - >>> messenger.route("amqp:*", "amqps:$1") - - Supply credentials for foo.com: - - >>> messenger.route("amqp://foo.com/*", "amqp://user:[email protected]/$1"); - - Supply credentials for all domains: - - >>> messenger.route("amqp://*", "amqp://user:password@$1"); - - Route all addresses through a single proxy while preserving the - original destination: - - >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); - - Route any address through a single broker: - - >>> messenger.route("*", "amqp://user:password@broker/$1"); - """ - self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address))) - - def rewrite(self, pattern, address): - """ - Similar to route(), except that the destination of - the L{Message} is determined before the message address is rewritten. - - The outgoing address is only rewritten after routing has been - finalized. If a message has an outgoing address of - "amqp://0.0.0.0:5678", and a rewriting rule that changes its - outgoing address to "foo", it will still arrive at the peer that - is listening on "amqp://0.0.0.0:5678", but when it arrives there, - the receiver will see its outgoing address as "foo". - - The default rewrite rule removes username and password from addresses - before they are transmitted. - """ - self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address))) - - def selectable(self): - return Selectable.wrap(pn_messenger_selectable(self._mng)) - - @property - def deadline(self): - tstamp = pn_messenger_deadline(self._mng) - if tstamp: - return millis2secs(tstamp) - else: - return None class Message(object): """The L{Message} class is a mutable holder of message content. @@ -4259,8 +3643,6 @@ __all__ = [ "Link", "Message", "MessageException", - "Messenger", - "MessengerException", "ProtonException", "VERSION_MAJOR", "VERSION_MINOR", http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/shim/cmessenger.py ---------------------------------------------------------------------- diff --git a/tests/java/shim/cmessenger.py b/tests/java/shim/cmessenger.py deleted file mode 100644 index 249e0dc..0000000 --- a/tests/java/shim/cmessenger.py +++ /dev/null @@ -1,225 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from org.apache.qpid.proton import Proton -from org.apache.qpid.proton.messenger import Messenger, Status -from org.apache.qpid.proton import InterruptException, TimeoutException - -from cerror import * - -# from proton/messenger.h -PN_STATUS_UNKNOWN = 0 -PN_STATUS_PENDING = 1 -PN_STATUS_ACCEPTED = 2 -PN_STATUS_REJECTED = 3 -PN_STATUS_RELEASED = 4 -PN_STATUS_MODIFIED = 5 -PN_STATUS_ABORTED = 6 -PN_STATUS_SETTLED = 7 - -PN_CUMULATIVE = 1 - -class pn_messenger_wrapper: - - def __init__(self, impl): - self.impl = impl - self.error = pn_error(0, None) - -def pn_messenger(name): - if name is None: - return pn_messenger_wrapper(Proton.messenger()) - else: - return pn_messenger_wrapper(Proton.messenger(name)) - -def pn_messenger_error(m): - return m.error - -def pn_messenger_set_timeout(m, t): - m.impl.setTimeout(t) - return 0 - -def pn_messenger_set_blocking(m, b): - m.impl.setBlocking(b) - return 0 - -def pn_messenger_set_certificate(m, c): - m.impl.setCertificate(c) - return 0 - -def pn_messenger_set_private_key(m, p): - m.impl.setPrivateKey(p) - return 0 - -def pn_messenger_set_password(m, p): - m.impl.setPassword(p) - return 0 - -def pn_messenger_set_trusted_certificates(m, t): - m.impl.setTrustedCertificates(t) - return 0 - -def pn_messenger_set_incoming_window(m, w): - m.impl.setIncomingWindow(w) - return 0 - -def pn_messenger_set_outgoing_window(m, w): - m.impl.setOutgoingWindow(w) - return 0 - -def pn_messenger_start(m): - m.impl.start() - return 0 - -# XXX: ??? -def pn_messenger_work(m, t): - try: - if m.impl.work(t): - return 1 - else: - return PN_TIMEOUT - except InterruptException, e: - return PN_INTR - -class pn_subscription: - - def __init__(self): - pass - -def pn_messenger_subscribe(m, source): - m.impl.subscribe(source) - return pn_subscription() - -def pn_messenger_route(m, pattern, address): - m.impl.route(pattern, address) - return 0 - -def pn_messenger_rewrite(m, pattern, address): - m.impl.rewrite(pattern, address) - return 0 - -def pn_messenger_interrupt(m): - m.impl.interrupt() - return 0 - -def pn_messenger_buffered(m, t): - raise Skipped() - -from org.apache.qpid.proton.engine import TransportException - -def pn_messenger_stop(m): - m.impl.stop() - return 0 - -def pn_messenger_stopped(m): - return m.impl.stopped() - -def pn_messenger_put(m, msg): - msg.pre_encode() - m.impl.put(msg.impl) - return 0 - -def pn_messenger_outgoing_tracker(m): - return m.impl.outgoingTracker() - -def pn_messenger_send(m, n): - try: - m.impl.send(n) - return 0 - except InterruptException, e: - return PN_INTR - except TimeoutException, e: - return PN_TIMEOUT - -def pn_messenger_recv(m, n): - try: - m.impl.recv(n) - return 0 - except InterruptException, e: - return PN_INTR - except TimeoutException, e: - return PN_TIMEOUT - -def pn_messenger_receiving(m): - return m.impl.receiving() - -def pn_messenger_incoming(m): - return m.impl.incoming() - -def pn_messenger_outgoing(m): - return m.impl.outgoing() - -def pn_messenger_get(m, msg): - mimpl = m.impl.get() - if msg: - msg.decode(mimpl) - return 0 - -def pn_messenger_incoming_tracker(m): - return m.impl.incomingTracker() - -def pn_messenger_accept(m, tracker, flags): - if flags: - m.impl.accept(tracker, Messenger.CUMULATIVE) - else: - m.impl.accept(tracker, 0) - return 0 - -def pn_messenger_reject(m, tracker, flags): - if flags: - m.impl.reject(tracker, Messenger.CUMULATIVE) - else: - m.impl.reject(tracker, 0) - return 0 - -def pn_messenger_settle(m, tracker, flags): - if flags: - m.impl.settle(tracker, Messenger.CUMULATIVE) - else: - m.impl.settle(tracker, 0) - return 0 - -STATUS_P2J = { - PN_STATUS_UNKNOWN: Status.UNKNOWN, - PN_STATUS_PENDING: Status.PENDING, - PN_STATUS_ACCEPTED: Status.ACCEPTED, - PN_STATUS_REJECTED: Status.REJECTED, - PN_STATUS_RELEASED: Status.RELEASED, - PN_STATUS_MODIFIED: Status.MODIFIED, - PN_STATUS_ABORTED: Status.ABORTED, - PN_STATUS_SETTLED: Status.SETTLED -} - -STATUS_J2P = { - Status.UNKNOWN: PN_STATUS_UNKNOWN, - Status.PENDING: PN_STATUS_PENDING, - Status.ACCEPTED: PN_STATUS_ACCEPTED, - Status.REJECTED: PN_STATUS_REJECTED, - Status.RELEASED: PN_STATUS_RELEASED, - Status.MODIFIED: PN_STATUS_MODIFIED, - Status.ABORTED: PN_STATUS_ABORTED, - Status.SETTLED: PN_STATUS_SETTLED -} - -def pn_messenger_status(m, tracker): - return STATUS_J2P[m.impl.getStatus(tracker)] - -def pn_messenger_set_passive(m, passive): - raise Skipped() - -def pn_messenger_selectable(m): - raise Skipped() http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/shim/cproton.py ---------------------------------------------------------------------- diff --git a/tests/java/shim/cproton.py b/tests/java/shim/cproton.py index d5ed574..ec7feec 100644 --- a/tests/java/shim/cproton.py +++ b/tests/java/shim/cproton.py @@ -35,7 +35,6 @@ from ccodec import * from cengine import * from csasl import * from cssl import * -from cmessenger import * from cmessage import * from curl import * from creactor import * http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/java/shim/curl.py ---------------------------------------------------------------------- diff --git a/tests/java/shim/curl.py b/tests/java/shim/curl.py index d4d3d37..2b8c9c8 100644 --- a/tests/java/shim/curl.py +++ b/tests/java/shim/curl.py @@ -17,7 +17,7 @@ # under the License # -from org.apache.qpid.proton.messenger.impl import Address +from org.apache.qpid.proton.reactor.impl import Address def pn_url(): return Address() http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/python/proton_tests/__init__.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/__init__.py b/tests/python/proton_tests/__init__.py index 66ce650..647cbf5 100644 --- a/tests/python/proton_tests/__init__.py +++ b/tests/python/proton_tests/__init__.py @@ -22,11 +22,9 @@ import proton_tests.engine import proton_tests.message import proton_tests.handler import proton_tests.reactor -import proton_tests.messenger import proton_tests.sasl import proton_tests.transport import proton_tests.ssl import proton_tests.interop -import proton_tests.soak import proton_tests.url import proton_tests.utils http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/python/proton_tests/common.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/common.py b/tests/python/proton_tests/common.py index aaefccd..02de4ff 100644 --- a/tests/python/proton_tests/common.py +++ b/tests/python/proton_tests/common.py @@ -264,308 +264,3 @@ class TestServer(object): def on_delivery(self, event): event.delivery.settle() -# -# Classes that wrap the messenger applications msgr-send and msgr-recv. -# These applications reside in the tests/tools/apps directory -# - -class MessengerApp(object): - """ Interface to control a MessengerApp """ - def __init__(self): - self._cmdline = None - # options common to Receivers and Senders: - self.ca_db = None - self.certificate = None - self.privatekey = None - self.password = None - self._output = None - - def start(self, verbose=False): - """ Begin executing the test """ - cmd = self.cmdline() - self._verbose = verbose - if self._verbose: - print("COMMAND='%s'" % str(cmd)) - #print("ENV='%s'" % str(os.environ.copy())) - try: - # Handle python launch by replacing script 'filename' with - # 'python abspath-to-filename' in cmdline arg list. - if cmd[0].endswith('.py'): - foundfile = findfileinpath(cmd[0], os.getenv('PATH')) - if foundfile is None: - msg = "Unable to locate file '%s' in PATH" % cmd[0] - raise Skipped("Skipping test - %s" % msg) - - del cmd[0:1] - cmd.insert(0, foundfile) - cmd.insert(0, sys.executable) - self._process = Popen(cmd, stdout=PIPE, stderr=STDOUT, - bufsize=4096, universal_newlines=True) - except OSError: - e = sys.exc_info()[1] - print("ERROR: '%s'" % e) - msg = "Unable to execute command '%s', is it in your PATH?" % cmd[0] - - # NOTE(flaper87): Skip the test if the command is not found. - if e.errno == 2: - raise Skipped("Skipping test - %s" % msg) - assert False, msg - - self._ready() # wait for it to initialize - - def stop(self): - """ Signal the client to start clean shutdown """ - pass - - def wait(self): - """ Wait for client to complete """ - self._output = self._process.communicate() - if self._verbose: - print("OUTPUT='%s'" % self.stdout()) - - def status(self): - """ Return status from client process """ - return self._process.returncode - - def stdout(self): - #self._process.communicate()[0] - if not self._output or not self._output[0]: - return "*** NO STDOUT ***" - return self._output[0] - - def stderr(self): - if not self._output or not self._output[1]: - return "*** NO STDERR ***" - return self._output[1] - - def cmdline(self): - if not self._cmdline: - self._build_command() - return self._cmdline - - def _build_command(self): - assert False, "_build_command() needs override" - - def _ready(self): - assert False, "_ready() needs override" - - def _do_common_options(self): - """ Common option handling """ - if self.ca_db is not None: - self._cmdline.append("-T") - self._cmdline.append(str(self.ca_db)) - if self.certificate is not None: - self._cmdline.append("-C") - self._cmdline.append(str(self.certificate)) - if self.privatekey is not None: - self._cmdline.append("-K") - self._cmdline.append(str(self.privatekey)) - if self.password is not None: - self._cmdline.append("-P") - self._cmdline.append("pass:" + str(self.password)) - - -class MessengerSender(MessengerApp): - """ Interface to configure a sending MessengerApp """ - def __init__(self): - MessengerApp.__init__(self) - self._command = None - # @todo make these properties - self.targets = [] - self.send_count = None - self.msg_size = None - self.send_batch = None - self.outgoing_window = None - self.report_interval = None - self.get_reply = False - self.timeout = None - self.incoming_window = None - self.recv_count = None - self.name = None - - # command string? - def _build_command(self): - self._cmdline = self._command - self._do_common_options() - assert self.targets, "Missing targets, required for sender!" - self._cmdline.append("-a") - self._cmdline.append(",".join(self.targets)) - if self.send_count is not None: - self._cmdline.append("-c") - self._cmdline.append(str(self.send_count)) - if self.msg_size is not None: - self._cmdline.append("-b") - self._cmdline.append(str(self.msg_size)) - if self.send_batch is not None: - self._cmdline.append("-p") - self._cmdline.append(str(self.send_batch)) - if self.outgoing_window is not None: - self._cmdline.append("-w") - self._cmdline.append(str(self.outgoing_window)) - if self.report_interval is not None: - self._cmdline.append("-e") - self._cmdline.append(str(self.report_interval)) - if self.get_reply: - self._cmdline.append("-R") - if self.timeout is not None: - self._cmdline.append("-t") - self._cmdline.append(str(self.timeout)) - if self.incoming_window is not None: - self._cmdline.append("-W") - self._cmdline.append(str(self.incoming_window)) - if self.recv_count is not None: - self._cmdline.append("-B") - self._cmdline.append(str(self.recv_count)) - if self.name is not None: - self._cmdline.append("-N") - self._cmdline.append(str(self.name)) - - def _ready(self): - pass - - -class MessengerReceiver(MessengerApp): - """ Interface to configure a receiving MessengerApp """ - def __init__(self): - MessengerApp.__init__(self) - self._command = None - # @todo make these properties - self.subscriptions = [] - self.receive_count = None - self.recv_count = None - self.incoming_window = None - self.timeout = None - self.report_interval = None - self.send_reply = False - self.outgoing_window = None - self.forwards = [] - self.name = None - - # command string? - def _build_command(self): - self._cmdline = self._command - self._do_common_options() - self._cmdline += ["-X", "READY"] - assert self.subscriptions, "Missing subscriptions, required for receiver!" - self._cmdline.append("-a") - self._cmdline.append(",".join(self.subscriptions)) - if self.receive_count is not None: - self._cmdline.append("-c") - self._cmdline.append(str(self.receive_count)) - if self.recv_count is not None: - self._cmdline.append("-b") - self._cmdline.append(str(self.recv_count)) - if self.incoming_window is not None: - self._cmdline.append("-w") - self._cmdline.append(str(self.incoming_window)) - if self.timeout is not None: - self._cmdline.append("-t") - self._cmdline.append(str(self.timeout)) - if self.report_interval is not None: - self._cmdline.append("-e") - self._cmdline.append(str(self.report_interval)) - if self.send_reply: - self._cmdline.append("-R") - if self.outgoing_window is not None: - self._cmdline.append("-W") - self._cmdline.append(str(self.outgoing_window)) - if self.forwards: - self._cmdline.append("-F") - self._cmdline.append(",".join(self.forwards)) - if self.name is not None: - self._cmdline.append("-N") - self._cmdline.append(str(self.name)) - - def _ready(self): - """ wait for subscriptions to complete setup. """ - r = self._process.stdout.readline() - assert r.strip() == "READY", "Unexpected input while waiting for receiver to initialize: %s" % r - -class MessengerSenderC(MessengerSender): - def __init__(self): - MessengerSender.__init__(self) - self._command = ["msgr-send"] - -class MessengerSenderValgrind(MessengerSenderC): - """ Run the C sender under Valgrind - """ - def __init__(self, suppressions=None): - if "VALGRIND" not in os.environ: - raise Skipped("Skipping test - $VALGRIND not set.") - MessengerSenderC.__init__(self) - if not suppressions: - suppressions = os.path.join(os.path.dirname(__file__), - "valgrind.supp" ) - self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", - "--trace-children=yes", "--leak-check=full", - "--suppressions=%s" % suppressions] + self._command - -class MessengerReceiverC(MessengerReceiver): - def __init__(self): - MessengerReceiver.__init__(self) - self._command = ["msgr-recv"] - -class MessengerReceiverValgrind(MessengerReceiverC): - """ Run the C receiver under Valgrind - """ - def __init__(self, suppressions=None): - if "VALGRIND" not in os.environ: - raise Skipped("Skipping test - $VALGRIND not set.") - MessengerReceiverC.__init__(self) - if not suppressions: - suppressions = os.path.join(os.path.dirname(__file__), - "valgrind.supp" ) - self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", - "--trace-children=yes", "--leak-check=full", - "--suppressions=%s" % suppressions] + self._command - -class MessengerSenderPython(MessengerSender): - def __init__(self): - MessengerSender.__init__(self) - self._command = ["msgr-send.py"] - - -class MessengerReceiverPython(MessengerReceiver): - def __init__(self): - MessengerReceiver.__init__(self) - self._command = ["msgr-recv.py"] - - - -class ReactorSenderC(MessengerSender): - def __init__(self): - MessengerSender.__init__(self) - self._command = ["reactor-send"] - -class ReactorSenderValgrind(ReactorSenderC): - """ Run the C sender under Valgrind - """ - def __init__(self, suppressions=None): - if "VALGRIND" not in os.environ: - raise Skipped("Skipping test - $VALGRIND not set.") - ReactorSenderC.__init__(self) - if not suppressions: - suppressions = os.path.join(os.path.dirname(__file__), - "valgrind.supp" ) - self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", - "--trace-children=yes", "--leak-check=full", - "--suppressions=%s" % suppressions] + self._command - -class ReactorReceiverC(MessengerReceiver): - def __init__(self): - MessengerReceiver.__init__(self) - self._command = ["reactor-recv"] - -class ReactorReceiverValgrind(ReactorReceiverC): - """ Run the C receiver under Valgrind - """ - def __init__(self, suppressions=None): - if "VALGRIND" not in os.environ: - raise Skipped("Skipping test - $VALGRIND not set.") - ReactorReceiverC.__init__(self) - if not suppressions: - suppressions = os.path.join(os.path.dirname(__file__), - "valgrind.supp" ) - self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", - "--trace-children=yes", "--leak-check=full", - "--suppressions=%s" % suppressions] + self._command http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/tests/python/proton_tests/messenger.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/messenger.py b/tests/python/proton_tests/messenger.py deleted file mode 100644 index 8da068e..0000000 --- a/tests/python/proton_tests/messenger.py +++ /dev/null @@ -1,1091 +0,0 @@ -from __future__ import absolute_import -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, sys, traceback -from . import common -from proton import * -from threading import Thread, Event -from time import sleep, time -from .common import Skipped - -class Test(common.Test): - - def setUp(self): - self.server_credit = 10 - self.server_received = 0 - self.server_finite_credit = False - self.server = Messenger("server") - self.server.timeout = self.timeout - self.server.start() - self.port = common.free_tcp_port() - self.server.subscribe("amqp://~127.0.0.1:%d" % self.port) - self.server_thread = Thread(name="server-thread", target=self.run_server) - self.server_thread.daemon = True - self.server_is_running_event = Event() - self.running = True - self.server_thread_started = False - - self.client = Messenger("client") - self.client.timeout = self.timeout - - def start(self): - self.server_thread_started = True - self.server_thread.start() - self.server_is_running_event.wait(self.timeout) - self.client.start() - - def _safelyStopClient(self): - self.server.interrupt() - self.client.stop() - self.client = None - - def tearDown(self): - try: - if self.running: - if not self.server_thread_started: self.start() - # send a message to cause the server to promptly exit - self.running = False - self._safelyStopClient() - finally: - self.server_thread.join(self.timeout) - self.server = None - -REJECT_ME = "*REJECT-ME*" - -class MessengerTest(Test): - - def run_server(self): - if self.server_finite_credit: - self._run_server_finite_credit() - else: - self._run_server_recv() - - def _run_server_recv(self): - """ Use recv() to replenish credit each time the server waits - """ - msg = Message() - try: - while self.running: - self.server_is_running_event.set() - try: - self.server.recv(self.server_credit) - self.process_incoming(msg) - except Interrupt: - pass - finally: - self.server.stop() - self.running = False - - def _run_server_finite_credit(self): - """ Grant credit once, process until credit runs out - """ - msg = Message() - self.server_is_running_event.set() - try: - self.server.recv(self.server_credit) - while self.running: - try: - # do not grant additional credit (eg. call recv()) - self.process_incoming(msg) - self.server.work() - except Interrupt: - break - finally: - self.server.stop() - self.running = False - - def process_incoming(self, msg): - while self.server.incoming: - self.server.get(msg) - self.server_received += 1 - if msg.body == REJECT_ME: - self.server.reject() - else: - self.server.accept() - self.dispatch(msg) - - def dispatch(self, msg): - if msg.reply_to: - msg.address = msg.reply_to - self.server.put(msg) - self.server.settle() - - def testSendReceive(self, size=None, address_size=None): - self.start() - msg = Message() - if address_size: - msg.address="amqp://127.0.0.1:%d/%s" % (self.port, "x"*address_size) - else: - msg.address="amqp://127.0.0.1:%d" % self.port - msg.reply_to = "~" - msg.subject="Hello World!" - body = "First the world, then the galaxy!" - if size is not None: - while len(body) < size: - body = 2*body - body = body[:size] - msg.body = body - self.client.put(msg) - self.client.send() - - reply = Message() - self.client.recv(1) - assert self.client.incoming == 1, self.client.incoming - self.client.get(reply) - - assert reply.subject == "Hello World!" - rbod = reply.body - assert rbod == body, (rbod, body) - - def testSendReceive1K(self): - self.testSendReceive(1024) - - def testSendReceive2K(self): - self.testSendReceive(2*1024) - - def testSendReceive4K(self): - self.testSendReceive(4*1024) - - def testSendReceive10K(self): - self.testSendReceive(10*1024) - - def testSendReceive100K(self): - self.testSendReceive(100*1024) - - def testSendReceive1M(self): - self.testSendReceive(1024*1024) - - def testSendReceiveLargeAddress(self): - self.testSendReceive(address_size=2048) - - # PROTON-285 - prevent continually failing test - def xtestSendBogus(self): - self.start() - msg = Message() - msg.address="totally-bogus-address" - try: - self.client.put(msg) - assert False, "Expecting MessengerException" - except MessengerException: - exc = sys.exc_info()[1] - err = str(exc) - assert "unable to send to address: totally-bogus-address" in err, err - - def testOutgoingWindow(self): - self.server.incoming_window = 10 - self.start() - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.subject="Hello World!" - - trackers = [] - for i in range(10): - trackers.append(self.client.put(msg)) - - self.client.send() - - for t in trackers: - assert self.client.status(t) is None - - # reduce outgoing_window to 5 and then try to send 10 messages - self.client.outgoing_window = 5 - - trackers = [] - for i in range(10): - trackers.append(self.client.put(msg)) - - for i in range(5): - t = trackers[i] - assert self.client.status(t) is None, (t, self.client.status(t)) - - for i in range(5, 10): - t = trackers[i] - assert self.client.status(t) is PENDING, (t, self.client.status(t)) - - self.client.send() - - for i in range(5): - t = trackers[i] - assert self.client.status(t) is None - - for i in range(5, 10): - t = trackers[i] - assert self.client.status(t) is ACCEPTED - - def testReject(self, process_incoming=None): - if process_incoming: - self.process_incoming = process_incoming - self.server.incoming_window = 10 - self.start() - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.subject="Hello World!" - - self.client.outgoing_window = 10 - trackers = [] - rejected = [] - for i in range(10): - if i == 5: - msg.body = REJECT_ME - else: - msg.body = "Yay!" - trackers.append(self.client.put(msg)) - if msg.body == REJECT_ME: - rejected.append(trackers[-1]) - - self.client.send() - - for t in trackers: - if t in rejected: - assert self.client.status(t) is REJECTED, (t, self.client.status(t)) - else: - assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) - - def testRejectIndividual(self): - self.testReject(self.reject_individual) - - def reject_individual(self, msg): - if self.server.incoming < 10: - self.server.work(0) - return - while self.server.incoming: - t = self.server.get(msg) - if msg.body == REJECT_ME: - self.server.reject(t) - self.dispatch(msg) - self.server.accept() - - - def testIncomingWindow(self): - self.server.incoming_window = 10 - self.server.outgoing_window = 10 - self.start() - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.reply_to = "~" - msg.subject="Hello World!" - - self.client.outgoing_window = 10 - trackers = [] - for i in range(10): - trackers.append(self.client.put(msg)) - - self.client.send() - - for t in trackers: - assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) - - self.client.incoming_window = 10 - remaining = 10 - - trackers = [] - while remaining: - self.client.recv(remaining) - while self.client.incoming: - t = self.client.get() - trackers.append(t) - self.client.accept(t) - remaining -= 1 - for t in trackers: - assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) - - def testIncomingQueueBiggerThanWindow(self, size=10): - self.server.outgoing_window = size - self.client.incoming_window = size - self.start() - - msg = Message() - msg.address = "amqp://127.0.0.1:%d" % self.port - msg.reply_to = "~" - msg.subject = "Hello World!" - - for i in range(2*size): - self.client.put(msg) - - trackers = [] - while len(trackers) < 2*size: - self.client.recv(2*size - len(trackers)) - while self.client.incoming: - t = self.client.get(msg) - assert self.client.status(t) is SETTLED, (t, self.client.status(t)) - trackers.append(t) - - for t in trackers[:size]: - assert self.client.status(t) is None, (t, self.client.status(t)) - for t in trackers[size:]: - assert self.client.status(t) is SETTLED, (t, self.client.status(t)) - - self.client.accept() - - for t in trackers[:size]: - assert self.client.status(t) is None, (t, self.client.status(t)) - for t in trackers[size:]: - assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) - - def testIncomingQueueBiggerThanSessionWindow(self): - self.testIncomingQueueBiggerThanWindow(2048) - - def testBuffered(self): - self.client.outgoing_window = 1000 - self.client.incoming_window = 1000 - self.start(); - assert self.server_received == 0 - buffering = 0 - count = 100 - for i in range(count): - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.subject="Hello World!" - msg.body = "First the world, then the galaxy!" - t = self.client.put(msg) - buffered = self.client.buffered(t) - # allow transition from False to True, but not back - if buffered: - buffering += 1 - else: - assert not buffering, ("saw %s buffered deliveries before?" % buffering) - - while self.client.outgoing: - last = self.client.outgoing - self.client.send() - #print "sent ", last - self.client.outgoing - - assert self.server_received == count - - def test_proton222(self): - self.start() - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.subject="Hello World!" - msg.body = "First the world, then the galaxy!" - assert self.server_received == 0 - self.client.put(msg) - self.client.send() - # ensure the server got the message without requiring client to stop first - deadline = time() + 10 - while self.server_received == 0: - assert time() < deadline, "Server did not receive message!" - sleep(.1) - assert self.server_received == 1 - - def testUnlimitedCredit(self): - """ Bring up two links. Verify credit is granted to each link by - transferring a message over each. - """ - self.server_credit = -1 - self.start() - - msg = Message() - msg.address="amqp://127.0.0.1:%d/XXX" % self.port - msg.reply_to = "~" - msg.subject="Hello World!" - body = "First the world, then the galaxy!" - msg.body = body - self.client.put(msg) - self.client.send() - - reply = Message() - self.client.recv(1) - assert self.client.incoming == 1 - self.client.get(reply) - - assert reply.subject == "Hello World!" - rbod = reply.body - assert rbod == body, (rbod, body) - - msg = Message() - msg.address="amqp://127.0.0.1:%d/YYY" % self.port - msg.reply_to = "~" - msg.subject="Hello World!" - body = "First the world, then the galaxy!" - msg.body = body - self.client.put(msg) - self.client.send() - - reply = Message() - self.client.recv(1) - assert self.client.incoming == 1 - self.client.get(reply) - - assert reply.subject == "Hello World!" - rbod = reply.body - assert rbod == body, (rbod, body) - - def _DISABLE_test_proton268(self): - """ Reproducer for JIRA Proton-268 """ - """ DISABLED: Causes failure on Jenkins, appears to be unrelated to fix """ - self.server_credit = 2048 - self.start() - - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.body = "X" * 1024 - - for x in range( 100 ): - self.client.put( msg ) - self.client.send() - - try: - self.client.stop() - except Timeout: - assert False, "Timeout waiting for client stop()" - - # need to restart client, as tearDown() uses it to stop server - self.client.start() - - def testRoute(self): - # anonymous cipher not supported on Windows - if os.name == "nt" or not common.isSSLPresent(): - domain = "amqp" - else: - domain = "amqps" - port = common.free_tcp_port() - self.server.subscribe(domain + "://~0.0.0.0:%d" % port) - self.start() - self.client.route("route1", "amqp://127.0.0.1:%d" % self.port) - self.client.route("route2", domain + "://127.0.0.1:%d" % port) - - msg = Message() - msg.address = "route1" - msg.reply_to = "~" - msg.body = "test" - self.client.put(msg) - self.client.recv(1) - - reply = Message() - self.client.get(reply) - - msg = Message() - msg.address = "route2" - msg.reply_to = "~" - msg.body = "test" - self.client.put(msg) - self.client.recv(1) - - self.client.get(reply) - assert reply.body == "test" - - def testDefaultRoute(self): - self.start() - self.client.route("*", "amqp://127.0.0.1:%d" % self.port) - - msg = Message() - msg.address = "asdf" - msg.body = "test" - msg.reply_to = "~" - - self.client.put(msg) - self.client.recv(1) - - reply = Message() - self.client.get(reply) - assert reply.body == "test" - - def testDefaultRouteSubstitution(self): - self.start() - self.client.route("*", "amqp://127.0.0.1:%d/$1" % self.port) - - msg = Message() - msg.address = "asdf" - msg.body = "test" - msg.reply_to = "~" - - self.client.put(msg) - self.client.recv(1) - - reply = Message() - self.client.get(reply) - assert reply.body == "test" - - def testIncomingRoute(self): - self.start() - port = common.free_tcp_port() - self.client.route("in", "amqp://~0.0.0.0:%d" % port) - self.client.subscribe("in") - - msg = Message() - msg.address = "amqp://127.0.0.1:%d" %self.port - msg.reply_to = "amqp://127.0.0.1:%d" % port - msg.body = "test" - - self.client.put(msg) - self.client.recv(1) - reply = Message() - self.client.get(reply) - assert reply.body == "test" - - def echo_address(self, msg): - while self.server.incoming: - self.server.get(msg) - msg.body = msg.address - self.dispatch(msg) - - def _testRewrite(self, original, rewritten): - self.start() - self.process_incoming = self.echo_address - self.client.route("*", "amqp://127.0.0.1:%d" % self.port) - - msg = Message() - msg.address = original - msg.body = "test" - msg.reply_to = "~" - - self.client.put(msg) - assert msg.address == original - self.client.recv(1) - assert self.client.incoming == 1 - - echo = Message() - self.client.get(echo) - assert echo.body == rewritten, (echo.body, rewritten) - assert msg.address == original - - def testDefaultRewriteH(self): - self._testRewrite("original", "original") - - def testDefaultRewriteUH(self): - self._testRewrite("user@original", "original") - - def testDefaultRewriteUPH(self): - self._testRewrite("user:pass@original", "original") - - def testDefaultRewriteHP(self): - self._testRewrite("original:123", "original:123") - - def testDefaultRewriteUHP(self): - self._testRewrite("user@original:123", "original:123") - - def testDefaultRewriteUPHP(self): - self._testRewrite("user:pass@original:123", "original:123") - - def testDefaultRewriteHN(self): - self._testRewrite("original/name", "original/name") - - def testDefaultRewriteUHN(self): - self._testRewrite("user@original/name", "original/name") - - def testDefaultRewriteUPHN(self): - self._testRewrite("user:pass@original/name", "original/name") - - def testDefaultRewriteHPN(self): - self._testRewrite("original:123/name", "original:123/name") - - def testDefaultRewriteUHPN(self): - self._testRewrite("user@original:123/name", "original:123/name") - - def testDefaultRewriteUPHPN(self): - self._testRewrite("user:pass@original:123/name", "original:123/name") - - def testDefaultRewriteSH(self): - self._testRewrite("amqp://original", "amqp://original") - - def testDefaultRewriteSUH(self): - self._testRewrite("amqp://user@original", "amqp://original") - - def testDefaultRewriteSUPH(self): - self._testRewrite("amqp://user:pass@original", "amqp://original") - - def testDefaultRewriteSHP(self): - self._testRewrite("amqp://original:123", "amqp://original:123") - - def testDefaultRewriteSUHP(self): - self._testRewrite("amqp://user@original:123", "amqp://original:123") - - def testDefaultRewriteSUPHP(self): - self._testRewrite("amqp://user:pass@original:123", "amqp://original:123") - - def testDefaultRewriteSHN(self): - self._testRewrite("amqp://original/name", "amqp://original/name") - - def testDefaultRewriteSUHN(self): - self._testRewrite("amqp://user@original/name", "amqp://original/name") - - def testDefaultRewriteSUPHN(self): - self._testRewrite("amqp://user:pass@original/name", "amqp://original/name") - - def testDefaultRewriteSHPN(self): - self._testRewrite("amqp://original:123/name", "amqp://original:123/name") - - def testDefaultRewriteSUHPN(self): - self._testRewrite("amqp://user@original:123/name", "amqp://original:123/name") - - def testDefaultRewriteSUPHPN(self): - self._testRewrite("amqp://user:pass@original:123/name", "amqp://original:123/name") - - def testRewriteSupress(self): - self.client.rewrite("*", None) - self._testRewrite("asdf", None) - - def testRewrite(self): - self.client.rewrite("a", "b") - self._testRewrite("a", "b") - - def testRewritePattern(self): - self.client.rewrite("amqp://%@*", "amqp://$2") - self._testRewrite("amqp://foo@bar", "amqp://bar") - - def testRewriteToAt(self): - self.client.rewrite("amqp://%/*", "$2@$1") - self._testRewrite("amqp://domain/name", "name@domain") - - def testRewriteOverrideDefault(self): - self.client.rewrite("*", "$1") - self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host") - - def testCreditBlockingRebalance(self): - """ The server is given a fixed amount of credit, and runs until that - credit is exhausted. - """ - self.server_finite_credit = True - self.server_credit = 11 - self.start() - - # put one message out on "Link1" - since there are no other links, it - # should get all the credit (10 after sending) - msg = Message() - msg.address="amqp://127.0.0.1:%d/Link1" % self.port - msg.subject="Hello World!" - body = "First the world, then the galaxy!" - msg.body = body - msg.reply_to = "~" - self.client.put(msg) - self.client.send() - self.client.recv(1) - assert self.client.incoming == 1 - - # Now attempt to exhaust credit using a different link - for i in range(10): - msg.address="amqp://127.0.0.1:%d/Link2" % self.port - self.client.put(msg) - self.client.send() - - deadline = time() + self.timeout - count = 0 - while count < 11 and time() < deadline: - self.client.recv(-1) - while self.client.incoming: - self.client.get(msg) - count += 1 - assert count == 11, count - - # now attempt to send one more. There isn't enough credit, so it should - # not be sent - self.client.timeout = 1 - msg.address="amqp://127.0.0.1:%d/Link2" % self.port - self.client.put(msg) - try: - self.client.send() - assert False, "expected client to time out in send()" - except Timeout: - pass - assert self.client.outgoing == 1 - - -class NBMessengerTest(common.Test): - - def setUp(self): - self.client = Messenger("client") - self.client2 = Messenger("client2") - self.server = Messenger("server") - self.messengers = [self.client, self.client2, self.server] - self.client.blocking = False - self.client2.blocking = False - self.server.blocking = False - self.server.start() - self.client.start() - self.client2.start() - port = common.free_tcp_port() - self.address = "amqp://127.0.0.1:%d" % port - self.server.subscribe("amqp://~0.0.0.0:%d" % port) - - def _pump(self, timeout, work_triggers_exit): - for msgr in self.messengers: - if msgr.work(timeout) and work_triggers_exit: - return True - return False - - def pump(self, timeout=0): - while self._pump(0, True): pass - self._pump(timeout, False) - while self._pump(0, True): pass - - def tearDown(self): - self.server.stop() - self.client.stop() - self.client2.stop() - self.pump() - assert self.server.stopped - assert self.client.stopped - assert self.client2.stopped - - def testSmoke(self, count=1): - self.server.recv() - - msg = Message() - msg.address = self.address - for i in range(count): - msg.body = "Hello %s" % i - self.client.put(msg) - - msg2 = Message() - for i in range(count): - if self.server.incoming == 0: - self.pump() - assert self.server.incoming > 0, self.server.incoming - self.server.get(msg2) - assert msg2.body == "Hello %s" % i, (msg2.body, i) - - assert self.client.outgoing == 0, self.client.outgoing - assert self.server.incoming == 0, self.client.incoming - - def testSmoke1024(self): - self.testSmoke(1024) - - def testSmoke4096(self): - self.testSmoke(4096) - - def testPushback(self): - self.server.recv() - - msg = Message() - msg.address = self.address - for i in range(16): - for i in range(1024): - self.client.put(msg) - self.pump() - if self.client.outgoing > 0: - break - - assert self.client.outgoing > 0 - - def testRecvBeforeSubscribe(self): - self.client.recv() - self.client.subscribe(self.address + "/foo") - - self.pump() - - msg = Message() - msg.address = "amqp://client/foo" - msg.body = "Hello World!" - self.server.put(msg) - - assert self.client.incoming == 0 - self.pump(self.delay) - assert self.client.incoming == 1 - - msg2 = Message() - self.client.get(msg2) - assert msg2.address == msg.address - assert msg2.body == msg.body - - def testCreditAutoBackpressure(self): - """ Verify that use of automatic credit (pn_messenger_recv(-1)) does not - fill the incoming queue indefinitely. If the receiver does not 'get' the - message, eventually the sender will block. See PROTON-350 """ - self.server.recv() - msg = Message() - msg.address = self.address - deadline = time() + self.timeout - while time() < deadline: - old = self.server.incoming - for j in range(1001): - self.client.put(msg) - self.pump() - if old == self.server.incoming: - break; - assert old == self.server.incoming, "Backpressure not active!" - - def testCreditRedistribution(self): - """ Verify that a fixed amount of credit will redistribute to new - links. - """ - self.server.recv( 5 ) - - # first link will get all credit - msg1 = Message() - msg1.address = self.address + "/msg1" - self.client.put(msg1) - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 4, self.server.receiving - - # no credit left over for this link - msg2 = Message() - msg2.address = self.address + "/msg2" - self.client.put(msg2) - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 4, self.server.receiving - - # eventually, credit will rebalance and the new link will send - deadline = time() + self.timeout - while time() < deadline: - sleep(.1) - self.pump() - if self.server.incoming == 2: - break; - assert self.server.incoming == 2, self.server.incoming - assert self.server.receiving == 3, self.server.receiving - - def testCreditReclaim(self): - """ Verify that credit is reclaimed when a link with outstanding credit is - torn down. - """ - self.server.recv( 9 ) - - # first link will get all credit - msg1 = Message() - msg1.address = self.address + "/msg1" - self.client.put(msg1) - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 8, self.server.receiving - - # no credit left over for this link - msg2 = Message() - msg2.address = self.address + "/msg2" - self.client.put(msg2) - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 8, self.server.receiving - - # and none for this new client - msg3 = Message() - msg3.address = self.address + "/msg3" - self.client2.put(msg3) - self.pump() - - # eventually, credit will rebalance and all links will - # send a message - deadline = time() + self.timeout - while time() < deadline: - sleep(.1) - self.pump() - if self.server.incoming == 3: - break; - assert self.server.incoming == 3, self.server.incoming - assert self.server.receiving == 6, self.server.receiving - - # now tear down client two, this should cause its outstanding credit to be - # made available to the other links - self.client2.stop() - self.pump() - - for i in range(4): - self.client.put(msg1) - self.client.put(msg2) - - # should exhaust all credit - deadline = time() + self.timeout - while time() < deadline: - sleep(.1) - self.pump() - if self.server.incoming == 9: - break; - assert self.server.incoming == 9, self.server.incoming - assert self.server.receiving == 0, self.server.receiving - - def testCreditReplenish(self): - """ When extra credit is available it should be granted to the first - link that can use it. - """ - # create three links - msg = Message() - for i in range(3): - msg.address = self.address + "/%d" % i - self.client.put(msg) - - self.server.recv( 50 ) # 50/3 = 16 per link + 2 extra - - self.pump() - assert self.server.incoming == 3, self.server.incoming - assert self.server.receiving == 47, self.server.receiving - - # 47/3 = 15 per link, + 2 extra - - # verify one link can send 15 + the two extra (17) - for i in range(17): - msg.address = self.address + "/0" - self.client.put(msg) - self.pump() - assert self.server.incoming == 20, self.server.incoming - assert self.server.receiving == 30, self.server.receiving - - # now verify that the remaining credit (30) will eventually rebalance - # across all links (10 per link) - for j in range(10): - for i in range(3): - msg.address = self.address + "/%d" % i - self.client.put(msg) - - deadline = time() + self.timeout - while time() < deadline: - sleep(.1) - self.pump() - if self.server.incoming == 50: - break - assert self.server.incoming == 50, self.server.incoming - assert self.server.receiving == 0, self.server.receiving - -from select import select - -class Pump: - - def __init__(self, *messengers): - self.messengers = messengers - self.selectables = [] - - def pump_once(self): - for m in self.messengers: - while True: - sel = m.selectable() - if sel: - self.selectables.append(sel) - else: - break - - reading = [] - writing = [] - - for sel in self.selectables[:]: - if sel.is_terminal: - sel.release() - self.selectables.remove(sel) - else: - if sel.reading: - reading.append(sel) - if sel.writing: - writing.append(sel) - - readable, writable, _ = select(reading, writing, [], 0.1) - - count = 0 - for s in readable: - s.readable() - count += 1 - for s in writable: - s.writable() - count += 1 - return count - - def pump(self): - while self.pump_once(): pass - -class SelectableMessengerTest(common.Test): - - def testSelectable(self, count = 1): - if os.name=="nt": - # Conflict between native OS select() in Pump and IOCP based pn_selector_t - # makes this fail on Windows (see PROTON-668). - raise Skipped("Invalid test on Windows with IOCP.") - - mrcv = Messenger() - mrcv.passive = True - port = common.free_tcp_port() - mrcv.subscribe("amqp://~0.0.0.0:%d" % port) - - msnd = Messenger() - msnd.passive = True - m = Message() - m.address = "amqp://127.0.0.1:%d" % port - - for i in range(count): - m.body = u"Hello World! %s" % i - msnd.put(m) - - p = Pump(msnd, mrcv) - p.pump() - - assert msnd.outgoing == count - assert mrcv.incoming == 0 - - mrcv.recv() - - mc = Message() - - try: - for i in range(count): - while mrcv.incoming == 0: - p.pump() - assert mrcv.incoming > 0, (count, msnd.outgoing, mrcv.incoming) - mrcv.get(mc) - assert mc.body == u"Hello World! %s" % i, (i, mc.body) - finally: - mrcv.stop() - msnd.stop() - assert not mrcv.stopped - assert not msnd.stopped - p.pump() - assert mrcv.stopped - assert msnd.stopped - - def testSelectable16(self): - self.testSelectable(count=16) - - def testSelectable1024(self): - self.testSelectable(count=1024) - - def testSelectable4096(self): - self.testSelectable(count=4096) - - -class IdleTimeoutTest(common.Test): - - def testIdleTimeout(self): - """ - Verify that a Messenger connection is kept alive using empty idle frames - when a idle_timeout is advertised by the remote peer. - """ - if "java" in sys.platform: - raise Skipped() - idle_timeout_secs = self.delay - - try: - idle_server = common.TestServer(idle_timeout=idle_timeout_secs) - idle_server.timeout = self.timeout - idle_server.start() - - idle_client = Messenger("idle_client") - idle_client.timeout = self.timeout - idle_client.start() - - idle_client.subscribe("amqp://%s:%s/foo" % - (idle_server.host, idle_server.port)) - idle_client.work(idle_timeout_secs/10) - - # wait up to 3x the idle timeout and hence verify that everything stays - # connected during that time by virtue of no Exception being raised - duration = 3 * idle_timeout_secs - deadline = time() + duration - while time() <= deadline: - idle_client.work(idle_timeout_secs/10) - continue - - # confirm link is still active - assert not idle_server.conditions, idle_server.conditions - finally: - try: - idle_client.stop() - except: - pass - try: - idle_server.stop() - except: - pass --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
