Repository: qpid-proton Updated Branches: refs/heads/master c82de9d3d -> 0c9bb9ffc
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/python/proton_tests/messenger.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/messenger.py b/tests/python/proton_tests/messenger.py deleted file mode 100644 index 91283ed..0000000 --- a/tests/python/proton_tests/messenger.py +++ /dev/null @@ -1,1089 +0,0 @@ -from __future__ import absolute_import -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, sys, traceback -from . import common -from proton import * -from threading import Thread, Event -from time import sleep, time -from .common import Skipped - -class Test(common.Test): - - def setUp(self): - self.server_credit = 10 - self.server_received = 0 - self.server_finite_credit = False - self.server = Messenger("server") - self.server.timeout = self.timeout - self.server.start() - self.port = common.free_tcp_port() - self.server.subscribe("amqp://~127.0.0.1:%d" % self.port) - self.server_thread = Thread(name="server-thread", target=self.run_server) - self.server_thread.daemon = True - self.server_is_running_event = Event() - self.running = True - self.server_thread_started = False - - self.client = Messenger("client") - self.client.timeout = self.timeout - - def start(self): - self.server_thread_started = True - self.server_thread.start() - self.server_is_running_event.wait(self.timeout) - self.client.start() - - def _safelyStopClient(self): - self.server.interrupt() - self.client.stop() - self.client = None - - def tearDown(self): - try: - if self.running: - if not self.server_thread_started: self.start() - # send a message to cause the server to promptly exit - self.running = False - self._safelyStopClient() - finally: - self.server_thread.join(self.timeout) - self.server = None - -REJECT_ME = "*REJECT-ME*" - -class MessengerTest(Test): - - def run_server(self): - if self.server_finite_credit: - self._run_server_finite_credit() - else: - self._run_server_recv() - - def _run_server_recv(self): - """ Use recv() to replenish credit each time the server waits - """ - msg = Message() - try: - while self.running: - self.server_is_running_event.set() - try: - self.server.recv(self.server_credit) - self.process_incoming(msg) - except Interrupt: - pass - finally: - self.server.stop() - self.running = False - - def _run_server_finite_credit(self): - """ Grant credit once, process until credit runs out - """ - msg = Message() - self.server_is_running_event.set() - try: - self.server.recv(self.server_credit) - while self.running: - try: - # do not grant additional credit (eg. call recv()) - self.process_incoming(msg) - self.server.work() - except Interrupt: - break - finally: - self.server.stop() - self.running = False - - def process_incoming(self, msg): - while self.server.incoming: - self.server.get(msg) - self.server_received += 1 - if msg.body == REJECT_ME: - self.server.reject() - else: - self.server.accept() - self.dispatch(msg) - - def dispatch(self, msg): - if msg.reply_to: - msg.address = msg.reply_to - self.server.put(msg) - self.server.settle() - - def testSendReceive(self, size=None, address_size=None): - self.start() - msg = Message() - if address_size: - msg.address="amqp://127.0.0.1:%d/%s" % (self.port, "x"*address_size) - else: - msg.address="amqp://127.0.0.1:%d" % self.port - msg.reply_to = "~" - msg.subject="Hello World!" - body = "First the world, then the galaxy!" - if size is not None: - while len(body) < size: - body = 2*body - body = body[:size] - msg.body = body - self.client.put(msg) - self.client.send() - - reply = Message() - self.client.recv(1) - assert self.client.incoming == 1, self.client.incoming - self.client.get(reply) - - assert reply.subject == "Hello World!" - rbod = reply.body - assert rbod == body, (rbod, body) - - def testSendReceive1K(self): - self.testSendReceive(1024) - - def testSendReceive2K(self): - self.testSendReceive(2*1024) - - def testSendReceive4K(self): - self.testSendReceive(4*1024) - - def testSendReceive10K(self): - self.testSendReceive(10*1024) - - def testSendReceive100K(self): - self.testSendReceive(100*1024) - - def testSendReceive1M(self): - self.testSendReceive(1024*1024) - - def testSendReceiveLargeAddress(self): - self.testSendReceive(address_size=2048) - - # PROTON-285 - prevent continually failing test - def xtestSendBogus(self): - self.start() - msg = Message() - msg.address="totally-bogus-address" - try: - self.client.put(msg) - assert False, "Expecting MessengerException" - except MessengerException: - exc = sys.exc_info()[1] - err = str(exc) - assert "unable to send to address: totally-bogus-address" in err, err - - def testOutgoingWindow(self): - self.server.incoming_window = 10 - self.start() - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.subject="Hello World!" - - trackers = [] - for i in range(10): - trackers.append(self.client.put(msg)) - - self.client.send() - - for t in trackers: - assert self.client.status(t) is None - - # reduce outgoing_window to 5 and then try to send 10 messages - self.client.outgoing_window = 5 - - trackers = [] - for i in range(10): - trackers.append(self.client.put(msg)) - - for i in range(5): - t = trackers[i] - assert self.client.status(t) is None, (t, self.client.status(t)) - - for i in range(5, 10): - t = trackers[i] - assert self.client.status(t) is PENDING, (t, self.client.status(t)) - - self.client.send() - - for i in range(5): - t = trackers[i] - assert self.client.status(t) is None - - for i in range(5, 10): - t = trackers[i] - assert self.client.status(t) is ACCEPTED - - def testReject(self, process_incoming=None): - if process_incoming: - self.process_incoming = process_incoming - self.server.incoming_window = 10 - self.start() - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.subject="Hello World!" - - self.client.outgoing_window = 10 - trackers = [] - rejected = [] - for i in range(10): - if i == 5: - msg.body = REJECT_ME - else: - msg.body = "Yay!" - trackers.append(self.client.put(msg)) - if msg.body == REJECT_ME: - rejected.append(trackers[-1]) - - self.client.send() - - for t in trackers: - if t in rejected: - assert self.client.status(t) is REJECTED, (t, self.client.status(t)) - else: - assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) - - def testRejectIndividual(self): - self.testReject(self.reject_individual) - - def reject_individual(self, msg): - if self.server.incoming < 10: - self.server.work(0) - return - while self.server.incoming: - t = self.server.get(msg) - if msg.body == REJECT_ME: - self.server.reject(t) - self.dispatch(msg) - self.server.accept() - - - def testIncomingWindow(self): - self.server.incoming_window = 10 - self.server.outgoing_window = 10 - self.start() - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.reply_to = "~" - msg.subject="Hello World!" - - self.client.outgoing_window = 10 - trackers = [] - for i in range(10): - trackers.append(self.client.put(msg)) - - self.client.send() - - for t in trackers: - assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) - - self.client.incoming_window = 10 - remaining = 10 - - trackers = [] - while remaining: - self.client.recv(remaining) - while self.client.incoming: - t = self.client.get() - trackers.append(t) - self.client.accept(t) - remaining -= 1 - for t in trackers: - assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) - - def testIncomingQueueBiggerThanWindow(self, size=10): - self.server.outgoing_window = size - self.client.incoming_window = size - self.start() - - msg = Message() - msg.address = "amqp://127.0.0.1:%d" % self.port - msg.reply_to = "~" - msg.subject = "Hello World!" - - for i in range(2*size): - self.client.put(msg) - - trackers = [] - while len(trackers) < 2*size: - self.client.recv(2*size - len(trackers)) - while self.client.incoming: - t = self.client.get(msg) - assert self.client.status(t) is SETTLED, (t, self.client.status(t)) - trackers.append(t) - - for t in trackers[:size]: - assert self.client.status(t) is None, (t, self.client.status(t)) - for t in trackers[size:]: - assert self.client.status(t) is SETTLED, (t, self.client.status(t)) - - self.client.accept() - - for t in trackers[:size]: - assert self.client.status(t) is None, (t, self.client.status(t)) - for t in trackers[size:]: - assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) - - def testIncomingQueueBiggerThanSessionWindow(self): - self.testIncomingQueueBiggerThanWindow(2048) - - def testBuffered(self): - self.client.outgoing_window = 1000 - self.client.incoming_window = 1000 - self.start(); - assert self.server_received == 0 - buffering = 0 - count = 100 - for i in range(count): - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.subject="Hello World!" - msg.body = "First the world, then the galaxy!" - t = self.client.put(msg) - buffered = self.client.buffered(t) - # allow transition from False to True, but not back - if buffered: - buffering += 1 - else: - assert not buffering, ("saw %s buffered deliveries before?" % buffering) - - while self.client.outgoing: - last = self.client.outgoing - self.client.send() - #print "sent ", last - self.client.outgoing - - assert self.server_received == count - - def test_proton222(self): - self.start() - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.subject="Hello World!" - msg.body = "First the world, then the galaxy!" - assert self.server_received == 0 - self.client.put(msg) - self.client.send() - # ensure the server got the message without requiring client to stop first - deadline = time() + 10 - while self.server_received == 0: - assert time() < deadline, "Server did not receive message!" - sleep(.1) - assert self.server_received == 1 - - def testUnlimitedCredit(self): - """ Bring up two links. Verify credit is granted to each link by - transferring a message over each. - """ - self.server_credit = -1 - self.start() - - msg = Message() - msg.address="amqp://127.0.0.1:%d/XXX" % self.port - msg.reply_to = "~" - msg.subject="Hello World!" - body = "First the world, then the galaxy!" - msg.body = body - self.client.put(msg) - self.client.send() - - reply = Message() - self.client.recv(1) - assert self.client.incoming == 1 - self.client.get(reply) - - assert reply.subject == "Hello World!" - rbod = reply.body - assert rbod == body, (rbod, body) - - msg = Message() - msg.address="amqp://127.0.0.1:%d/YYY" % self.port - msg.reply_to = "~" - msg.subject="Hello World!" - body = "First the world, then the galaxy!" - msg.body = body - self.client.put(msg) - self.client.send() - - reply = Message() - self.client.recv(1) - assert self.client.incoming == 1 - self.client.get(reply) - - assert reply.subject == "Hello World!" - rbod = reply.body - assert rbod == body, (rbod, body) - - def _DISABLE_test_proton268(self): - """ Reproducer for JIRA Proton-268 """ - """ DISABLED: Causes failure on Jenkins, appears to be unrelated to fix """ - self.server_credit = 2048 - self.start() - - msg = Message() - msg.address="amqp://127.0.0.1:%d" % self.port - msg.body = "X" * 1024 - - for x in range( 100 ): - self.client.put( msg ) - self.client.send() - - try: - self.client.stop() - except Timeout: - assert False, "Timeout waiting for client stop()" - - # need to restart client, as tearDown() uses it to stop server - self.client.start() - - def testRoute(self): - # anonymous cipher not supported on Windows - if os.name == "nt" or not common.isSSLPresent(): - domain = "amqp" - else: - domain = "amqps" - port = common.free_tcp_port() - self.server.subscribe(domain + "://~0.0.0.0:%d" % port) - self.start() - self.client.route("route1", "amqp://127.0.0.1:%d" % self.port) - self.client.route("route2", domain + "://127.0.0.1:%d" % port) - - msg = Message() - msg.address = "route1" - msg.reply_to = "~" - msg.body = "test" - self.client.put(msg) - self.client.recv(1) - - reply = Message() - self.client.get(reply) - - msg = Message() - msg.address = "route2" - msg.reply_to = "~" - msg.body = "test" - self.client.put(msg) - self.client.recv(1) - - self.client.get(reply) - assert reply.body == "test" - - def testDefaultRoute(self): - self.start() - self.client.route("*", "amqp://127.0.0.1:%d" % self.port) - - msg = Message() - msg.address = "asdf" - msg.body = "test" - msg.reply_to = "~" - - self.client.put(msg) - self.client.recv(1) - - reply = Message() - self.client.get(reply) - assert reply.body == "test" - - def testDefaultRouteSubstitution(self): - self.start() - self.client.route("*", "amqp://127.0.0.1:%d/$1" % self.port) - - msg = Message() - msg.address = "asdf" - msg.body = "test" - msg.reply_to = "~" - - self.client.put(msg) - self.client.recv(1) - - reply = Message() - self.client.get(reply) - assert reply.body == "test" - - def testIncomingRoute(self): - self.start() - port = common.free_tcp_port() - self.client.route("in", "amqp://~0.0.0.0:%d" % port) - self.client.subscribe("in") - - msg = Message() - msg.address = "amqp://127.0.0.1:%d" %self.port - msg.reply_to = "amqp://127.0.0.1:%d" % port - msg.body = "test" - - self.client.put(msg) - self.client.recv(1) - reply = Message() - self.client.get(reply) - assert reply.body == "test" - - def echo_address(self, msg): - while self.server.incoming: - self.server.get(msg) - msg.body = msg.address - self.dispatch(msg) - - def _testRewrite(self, original, rewritten): - self.start() - self.process_incoming = self.echo_address - self.client.route("*", "amqp://127.0.0.1:%d" % self.port) - - msg = Message() - msg.address = original - msg.body = "test" - msg.reply_to = "~" - - self.client.put(msg) - assert msg.address == original - self.client.recv(1) - assert self.client.incoming == 1 - - echo = Message() - self.client.get(echo) - assert echo.body == rewritten, (echo.body, rewritten) - assert msg.address == original - - def testDefaultRewriteH(self): - self._testRewrite("original", "original") - - def testDefaultRewriteUH(self): - self._testRewrite("user@original", "original") - - def testDefaultRewriteUPH(self): - self._testRewrite("user:pass@original", "original") - - def testDefaultRewriteHP(self): - self._testRewrite("original:123", "original:123") - - def testDefaultRewriteUHP(self): - self._testRewrite("user@original:123", "original:123") - - def testDefaultRewriteUPHP(self): - self._testRewrite("user:pass@original:123", "original:123") - - def testDefaultRewriteHN(self): - self._testRewrite("original/name", "original/name") - - def testDefaultRewriteUHN(self): - self._testRewrite("user@original/name", "original/name") - - def testDefaultRewriteUPHN(self): - self._testRewrite("user:pass@original/name", "original/name") - - def testDefaultRewriteHPN(self): - self._testRewrite("original:123/name", "original:123/name") - - def testDefaultRewriteUHPN(self): - self._testRewrite("user@original:123/name", "original:123/name") - - def testDefaultRewriteUPHPN(self): - self._testRewrite("user:pass@original:123/name", "original:123/name") - - def testDefaultRewriteSH(self): - self._testRewrite("amqp://original", "amqp://original") - - def testDefaultRewriteSUH(self): - self._testRewrite("amqp://user@original", "amqp://original") - - def testDefaultRewriteSUPH(self): - self._testRewrite("amqp://user:pass@original", "amqp://original") - - def testDefaultRewriteSHP(self): - self._testRewrite("amqp://original:123", "amqp://original:123") - - def testDefaultRewriteSUHP(self): - self._testRewrite("amqp://user@original:123", "amqp://original:123") - - def testDefaultRewriteSUPHP(self): - self._testRewrite("amqp://user:pass@original:123", "amqp://original:123") - - def testDefaultRewriteSHN(self): - self._testRewrite("amqp://original/name", "amqp://original/name") - - def testDefaultRewriteSUHN(self): - self._testRewrite("amqp://user@original/name", "amqp://original/name") - - def testDefaultRewriteSUPHN(self): - self._testRewrite("amqp://user:pass@original/name", "amqp://original/name") - - def testDefaultRewriteSHPN(self): - self._testRewrite("amqp://original:123/name", "amqp://original:123/name") - - def testDefaultRewriteSUHPN(self): - self._testRewrite("amqp://user@original:123/name", "amqp://original:123/name") - - def testDefaultRewriteSUPHPN(self): - self._testRewrite("amqp://user:pass@original:123/name", "amqp://original:123/name") - - def testRewriteSuppress(self): - self.client.rewrite("*", None) - self._testRewrite("asdf", None) - - def testRewrite(self): - self.client.rewrite("a", "b") - self._testRewrite("a", "b") - - def testRewritePattern(self): - self.client.rewrite("amqp://%@*", "amqp://$2") - self._testRewrite("amqp://foo@bar", "amqp://bar") - - def testRewriteToAt(self): - self.client.rewrite("amqp://%/*", "$2@$1") - self._testRewrite("amqp://domain/name", "name@domain") - - def testRewriteOverrideDefault(self): - self.client.rewrite("*", "$1") - self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host") - - def testCreditBlockingRebalance(self): - """ The server is given a fixed amount of credit, and runs until that - credit is exhausted. - """ - self.server_finite_credit = True - self.server_credit = 11 - self.start() - - # put one message out on "Link1" - since there are no other links, it - # should get all the credit (10 after sending) - msg = Message() - msg.address="amqp://127.0.0.1:%d/Link1" % self.port - msg.subject="Hello World!" - body = "First the world, then the galaxy!" - msg.body = body - msg.reply_to = "~" - self.client.put(msg) - self.client.send() - self.client.recv(1) - assert self.client.incoming == 1 - - # Now attempt to exhaust credit using a different link - for i in range(10): - msg.address="amqp://127.0.0.1:%d/Link2" % self.port - self.client.put(msg) - self.client.send() - - deadline = time() + self.timeout - count = 0 - while count < 11 and time() < deadline: - self.client.recv(-1) - while self.client.incoming: - self.client.get(msg) - count += 1 - assert count == 11, count - - # now attempt to send one more. There isn't enough credit, so it should - # not be sent - self.client.timeout = 1 - msg.address="amqp://127.0.0.1:%d/Link2" % self.port - self.client.put(msg) - try: - self.client.send() - assert False, "expected client to time out in send()" - except Timeout: - pass - assert self.client.outgoing == 1 - - -class NBMessengerTest(common.Test): - - def setUp(self): - self.client = Messenger("client") - self.client2 = Messenger("client2") - self.server = Messenger("server") - self.messengers = [self.client, self.client2, self.server] - self.client.blocking = False - self.client2.blocking = False - self.server.blocking = False - self.server.start() - self.client.start() - self.client2.start() - port = common.free_tcp_port() - self.address = "amqp://127.0.0.1:%d" % port - self.server.subscribe("amqp://~0.0.0.0:%d" % port) - - def _pump(self, timeout, work_triggers_exit): - for msgr in self.messengers: - if msgr.work(timeout) and work_triggers_exit: - return True - return False - - def pump(self, timeout=0): - while self._pump(0, True): pass - self._pump(timeout, False) - while self._pump(0, True): pass - - def tearDown(self): - self.server.stop() - self.client.stop() - self.client2.stop() - self.pump() - assert self.server.stopped - assert self.client.stopped - assert self.client2.stopped - - def testSmoke(self, count=1): - self.server.recv() - - msg = Message() - msg.address = self.address - for i in range(count): - msg.body = "Hello %s" % i - self.client.put(msg) - - msg2 = Message() - for i in range(count): - if self.server.incoming == 0: - self.pump() - assert self.server.incoming > 0, self.server.incoming - self.server.get(msg2) - assert msg2.body == "Hello %s" % i, (msg2.body, i) - - assert self.client.outgoing == 0, self.client.outgoing - assert self.server.incoming == 0, self.client.incoming - - def testSmoke1024(self): - self.testSmoke(1024) - - def testSmoke4096(self): - self.testSmoke(4096) - - def testPushback(self): - self.server.recv() - - msg = Message() - msg.address = self.address - for i in range(16): - for i in range(1024): - self.client.put(msg) - self.pump() - if self.client.outgoing > 0: - break - - assert self.client.outgoing > 0 - - def testRecvBeforeSubscribe(self): - self.client.recv() - self.client.subscribe(self.address + "/foo") - - self.pump() - - msg = Message() - msg.address = "amqp://client/foo" - msg.body = "Hello World!" - self.server.put(msg) - - assert self.client.incoming == 0 - self.pump(self.delay) - assert self.client.incoming == 1 - - msg2 = Message() - self.client.get(msg2) - assert msg2.address == msg.address - assert msg2.body == msg.body - - def testCreditAutoBackpressure(self): - """ Verify that use of automatic credit (pn_messenger_recv(-1)) does not - fill the incoming queue indefinitely. If the receiver does not 'get' the - message, eventually the sender will block. See PROTON-350 """ - self.server.recv() - msg = Message() - msg.address = self.address - deadline = time() + self.timeout - while time() < deadline: - old = self.server.incoming - for j in range(1001): - self.client.put(msg) - self.pump() - if old == self.server.incoming: - break; - assert old == self.server.incoming, "Backpressure not active!" - - def testCreditRedistribution(self): - """ Verify that a fixed amount of credit will redistribute to new - links. - """ - self.server.recv( 5 ) - - # first link will get all credit - msg1 = Message() - msg1.address = self.address + "/msg1" - self.client.put(msg1) - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 4, self.server.receiving - - # no credit left over for this link - msg2 = Message() - msg2.address = self.address + "/msg2" - self.client.put(msg2) - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 4, self.server.receiving - - # eventually, credit will rebalance and the new link will send - deadline = time() + self.timeout - while time() < deadline: - sleep(.1) - self.pump() - if self.server.incoming == 2: - break; - assert self.server.incoming == 2, self.server.incoming - assert self.server.receiving == 3, self.server.receiving - - def testCreditReclaim(self): - """ Verify that credit is reclaimed when a link with outstanding credit is - torn down. - """ - self.server.recv( 9 ) - - # first link will get all credit - msg1 = Message() - msg1.address = self.address + "/msg1" - self.client.put(msg1) - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 8, self.server.receiving - - # no credit left over for this link - msg2 = Message() - msg2.address = self.address + "/msg2" - self.client.put(msg2) - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 8, self.server.receiving - - # and none for this new client - msg3 = Message() - msg3.address = self.address + "/msg3" - self.client2.put(msg3) - self.pump() - - # eventually, credit will rebalance and all links will - # send a message - deadline = time() + self.timeout - while time() < deadline: - sleep(.1) - self.pump() - if self.server.incoming == 3: - break; - assert self.server.incoming == 3, self.server.incoming - assert self.server.receiving == 6, self.server.receiving - - # now tear down client two, this should cause its outstanding credit to be - # made available to the other links - self.client2.stop() - self.pump() - - for i in range(4): - self.client.put(msg1) - self.client.put(msg2) - - # should exhaust all credit - deadline = time() + self.timeout - while time() < deadline: - sleep(.1) - self.pump() - if self.server.incoming == 9: - break; - assert self.server.incoming == 9, self.server.incoming - assert self.server.receiving == 0, self.server.receiving - - def testCreditReplenish(self): - """ When extra credit is available it should be granted to the first - link that can use it. - """ - # create three links - msg = Message() - for i in range(3): - msg.address = self.address + "/%d" % i - self.client.put(msg) - - self.server.recv( 50 ) # 50/3 = 16 per link + 2 extra - - self.pump() - assert self.server.incoming == 3, self.server.incoming - assert self.server.receiving == 47, self.server.receiving - - # 47/3 = 15 per link, + 2 extra - - # verify one link can send 15 + the two extra (17) - for i in range(17): - msg.address = self.address + "/0" - self.client.put(msg) - self.pump() - assert self.server.incoming == 20, self.server.incoming - assert self.server.receiving == 30, self.server.receiving - - # now verify that the remaining credit (30) will eventually rebalance - # across all links (10 per link) - for j in range(10): - for i in range(3): - msg.address = self.address + "/%d" % i - self.client.put(msg) - - deadline = time() + self.timeout - while time() < deadline: - sleep(.1) - self.pump() - if self.server.incoming == 50: - break - assert self.server.incoming == 50, self.server.incoming - assert self.server.receiving == 0, self.server.receiving - -from select import select - -class Pump: - - def __init__(self, *messengers): - self.messengers = messengers - self.selectables = [] - - def pump_once(self): - for m in self.messengers: - while True: - sel = m.selectable() - if sel: - self.selectables.append(sel) - else: - break - - reading = [] - writing = [] - - for sel in self.selectables[:]: - if sel.is_terminal: - sel.release() - self.selectables.remove(sel) - else: - if sel.reading: - reading.append(sel) - if sel.writing: - writing.append(sel) - - readable, writable, _ = select(reading, writing, [], 0.1) - - count = 0 - for s in readable: - s.readable() - count += 1 - for s in writable: - s.writable() - count += 1 - return count - - def pump(self): - while self.pump_once(): pass - -class SelectableMessengerTest(common.Test): - - def testSelectable(self, count = 1): - if os.name=="nt": - # Conflict between native OS select() in Pump and IOCP based pn_selector_t - # makes this fail on Windows (see PROTON-668). - raise Skipped("Invalid test on Windows with IOCP.") - - mrcv = Messenger() - mrcv.passive = True - port = common.free_tcp_port() - mrcv.subscribe("amqp://~0.0.0.0:%d" % port) - - msnd = Messenger() - msnd.passive = True - m = Message() - m.address = "amqp://127.0.0.1:%d" % port - - for i in range(count): - m.body = u"Hello World! %s" % i - msnd.put(m) - - p = Pump(msnd, mrcv) - p.pump() - - assert msnd.outgoing == count - assert mrcv.incoming == 0 - - mrcv.recv() - - mc = Message() - - try: - for i in range(count): - while mrcv.incoming == 0: - p.pump() - assert mrcv.incoming > 0, (count, msnd.outgoing, mrcv.incoming) - mrcv.get(mc) - assert mc.body == u"Hello World! %s" % i, (i, mc.body) - finally: - mrcv.stop() - msnd.stop() - assert not mrcv.stopped - assert not msnd.stopped - p.pump() - assert mrcv.stopped - assert msnd.stopped - - def testSelectable16(self): - self.testSelectable(count=16) - - def testSelectable1024(self): - self.testSelectable(count=1024) - - def testSelectable4096(self): - self.testSelectable(count=4096) - - -class IdleTimeoutTest(common.Test): - - def testIdleTimeout(self): - """ - Verify that a Messenger connection is kept alive using empty idle frames - when a idle_timeout is advertised by the remote peer. - """ - idle_timeout_secs = self.delay - - try: - idle_server = common.TestServer(idle_timeout=idle_timeout_secs) - idle_server.timeout = self.timeout - idle_server.start() - - idle_client = Messenger("idle_client") - idle_client.timeout = self.timeout - idle_client.start() - - idle_client.subscribe("amqp://%s:%s/foo" % - (idle_server.host, idle_server.port)) - idle_client.work(idle_timeout_secs/10) - - # wait up to 3x the idle timeout and hence verify that everything stays - # connected during that time by virtue of no Exception being raised - duration = 3 * idle_timeout_secs - deadline = time() + duration - while time() <= deadline: - idle_client.work(idle_timeout_secs/10) - continue - - # confirm link is still active - assert not idle_server.conditions, idle_server.conditions - finally: - try: - idle_client.stop() - except: - pass - try: - idle_server.stop() - except: - pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/python/proton_tests/soak.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/soak.py b/tests/python/proton_tests/soak.py index 52382ba..b7b521c 100644 --- a/tests/python/proton_tests/soak.py +++ b/tests/python/proton_tests/soak.py @@ -22,7 +22,6 @@ import sys from .common import Test, Skipped, free_tcp_ports, \ MessengerReceiverC, MessengerSenderC, \ MessengerReceiverValgrind, MessengerSenderValgrind, \ - MessengerReceiverPython, MessengerSenderPython, \ ReactorReceiverC, ReactorSenderC, \ ReactorReceiverValgrind, ReactorSenderValgrind, \ isSSLPresent @@ -294,15 +293,6 @@ class MessengerTests(AppTests): self.valgrind_test() self._do_oneway_test(MessengerReceiverValgrind(), MessengerSenderValgrind()) - def test_oneway_Python(self): - self._do_oneway_test(MessengerReceiverPython(), MessengerSenderPython()) - - def test_oneway_C_Python(self): - self._do_oneway_test(MessengerReceiverC(), MessengerSenderPython()) - - def test_oneway_Python_C(self): - self._do_oneway_test(MessengerReceiverPython(), MessengerSenderC()) - def test_echo_C(self): self._do_echo_test(MessengerReceiverC(), MessengerSenderC()) @@ -314,15 +304,6 @@ class MessengerTests(AppTests): self.valgrind_test() self._do_echo_test(MessengerReceiverValgrind(), MessengerSenderValgrind()) - def test_echo_Python(self): - self._do_echo_test(MessengerReceiverPython(), MessengerSenderPython()) - - def test_echo_C_Python(self): - self._do_echo_test(MessengerReceiverC(), MessengerSenderPython()) - - def test_echo_Python_C(self): - self._do_echo_test(MessengerReceiverPython(), MessengerSenderC()) - def test_relay_C(self): self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC()) @@ -334,12 +315,6 @@ class MessengerTests(AppTests): self.valgrind_test() self._do_relay_test(MessengerReceiverValgrind(), MessengerReceiverValgrind(), MessengerSenderValgrind()) - def test_relay_C_Python(self): - self._do_relay_test(MessengerReceiverC(), MessengerReceiverPython(), MessengerSenderPython()) - - def test_relay_Python(self): - self._do_relay_test(MessengerReceiverPython(), MessengerReceiverPython(), MessengerSenderPython()) - def test_star_topology_C(self): self._do_star_topology_test( MessengerReceiverC, MessengerSenderC ) @@ -351,15 +326,6 @@ class MessengerTests(AppTests): self.valgrind_test() self._do_star_topology_test( MessengerReceiverValgrind, MessengerSenderValgrind ) - def test_star_topology_Python(self): - self._do_star_topology_test( MessengerReceiverPython, MessengerSenderPython ) - - def test_star_topology_Python_C(self): - self._do_star_topology_test( MessengerReceiverPython, MessengerSenderC ) - - def test_star_topology_C_Python(self): - self._do_star_topology_test( MessengerReceiverPython, MessengerSenderC ) - def test_oneway_reactor(self): self._do_oneway_test(ReactorReceiverC(), ReactorSenderC()) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/python/proton_tests/ssl.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/ssl.py b/tests/python/proton_tests/ssl.py index 1cada89..d2e2f44 100644 --- a/tests/python/proton_tests/ssl.py +++ b/tests/python/proton_tests/ssl.py @@ -934,125 +934,3 @@ class SslTest(common.Test): assert False, "Expected error did not occur!" except SSLException: pass - -class MessengerSSLTests(common.Test): - - def setUp(self): - if not common.isSSLPresent(): - raise Skipped("No SSL libraries found.") - self.server = Messenger() - self.client = Messenger() - self.server.blocking = False - self.client.blocking = False - - def tearDown(self): - self.server.stop() - self.client.stop() - self.pump() - assert self.server.stopped - assert self.client.stopped - - def pump(self, timeout=0): - while self.client.work(0) or self.server.work(0): pass - self.client.work(timeout) - self.server.work(timeout) - while self.client.work(0) or self.server.work(0): pass - - def test_server_credentials(self, - cert="server-certificate.pem", - key="server-private-key.pem", - password="server-password", - exception=None): - import sys - self.server.certificate = _testpath(cert) - self.server.private_key = _testpath(key) - self.server.password = password - port = common.free_tcp_ports()[0] - try: - self.server.start() - self.server.subscribe("amqps://~0.0.0.0:%s" % port) - if exception is not None: - assert False, "expected failure did not occur" - except MessengerException: - e = sys.exc_info()[1] - if exception: - assert exception in str(e), str(e) - else: - raise e - - def test_server_credentials_bad_cert(self): - self.test_server_credentials(cert="bad", - exception="invalid credentials") - - def test_server_credentials_bad_key(self): - self.test_server_credentials(key="bad", - exception="invalid credentials") - - def test_server_credentials_bad_password(self): - self.test_server_credentials(password="bad", - exception="invalid credentials") - - def test_client_credentials(self, - trusted="ca-certificate.pem", - cert="client-certificate.pem", - key="client-private-key.pem", - password="client-password", - altserv=False, - fail=False): - if altserv: - self.server.certificate = _testpath("bad-server-certificate.pem") - self.server.private_key = _testpath("bad-server-private-key.pem") - self.server.password = "server-password" - else: - self.server.certificate = _testpath("client-certificate.pem") - self.server.private_key = _testpath("client-private-key.pem") - self.server.password = "client-password" - self.server.start() - port = common.free_tcp_ports()[0] - self.server.subscribe("amqps://~0.0.0.0:%s" % port) - self.server.incoming_window = 10 - - self.client.trusted_certificates = _testpath(trusted) - self.client.certificate = _testpath(cert) - self.client.private_key = _testpath(key) - self.client.password = password - self.client.outgoing_window = 10 - self.client.start() - - self.server.recv() - - msg = Message() - msg.address = "amqps://127.0.0.1:%s" % port - # make sure a large, uncompressible message body works! - msg.body = "".join(random.choice(string.ascii_letters) - for x in range(10099)) - trk = self.client.put(msg) - self.client.send() - - self.pump() - - if fail: - assert self.server.incoming == 0, self.server.incoming - assert self.client.status(trk) == ABORTED, self.client.status(trk) - else: - assert self.server.incoming == 1, self.server.incoming - - rmsg = Message() - self.server.get(rmsg) - assert rmsg.body == msg.body - self.server.accept() - self.pump() - - assert self.client.status(trk) == ACCEPTED, self.client.status(trk) - - def test_client_credentials_bad_cert(self): - self.test_client_credentials(cert="bad", fail=True) - - def test_client_credentials_bad_trusted(self): - self.test_client_credentials(trusted="bad", fail=True) - - def test_client_credentials_bad_password(self): - self.test_client_credentials(password="bad", fail=True) - - def test_client_credentials_untrusted(self): - self.test_client_credentials(altserv=True, fail=True) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/smoke/recv.php ---------------------------------------------------------------------- diff --git a/tests/smoke/recv.php b/tests/smoke/recv.php deleted file mode 100755 index 96c2ec9..0000000 --- a/tests/smoke/recv.php +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env php -<?php - -include("proton.php"); - -$messenger = new Messenger(); -$messenger->incoming_window = 10; -$message = new Message(); - -$address = $argv[1]; -if (!$address) { - $address = "~0.0.0.0"; -} -$messenger->subscribe($address); - -$messenger->start(); - -while (true) { - $messenger->recv(); - $messenger->get($message); - print "Got: $message\n"; - $messenger->accept(); -} - -$messenger->stop(); - -?> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/smoke/recv.pl ---------------------------------------------------------------------- diff --git a/tests/smoke/recv.pl b/tests/smoke/recv.pl deleted file mode 100755 index 796bb7e..0000000 --- a/tests/smoke/recv.pl +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env perl -require 'qpid_proton.pm'; - -my $messenger = qpid::proton::Messenger->new(); -$messenger->set_incoming_window(1); -my $message = qpid::proton::Message->new(); - -my $address = $ARGV[0]; -$address = "~0.0.0.0" if !defined $address; -$messenger->subscribe($address); - -$messenger->start(); - -while (true) { - my $err = $messenger->receive(); - # XXX: no exceptions! - die $messenger->get_error() if $err < 0; - $messenger->get($message); - print "Got: $message\n"; - $messenger->accept(); -} - -$messenger->stop(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/smoke/recv.py ---------------------------------------------------------------------- diff --git a/tests/smoke/recv.py b/tests/smoke/recv.py deleted file mode 100755 index e6aa2b6..0000000 --- a/tests/smoke/recv.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python -from __future__ import print_function -import sys -from proton import * - -messenger = Messenger() -messenger.incoming_window = 1 -message = Message() - -address = "~0.0.0.0" -if len(sys.argv) > 1: - address = sys.argv[1] -messenger.subscribe(address) - -messenger.start() - -while True: - messenger.recv() - messenger.get(message) - print("Got: %s" % message) - messenger.accept() - -messenger.stop() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/smoke/recv.rb ---------------------------------------------------------------------- diff --git a/tests/smoke/recv.rb b/tests/smoke/recv.rb deleted file mode 100755 index 29df6cf..0000000 --- a/tests/smoke/recv.rb +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env ruby - -require 'qpid_proton.rb' - -messenger = Qpid::Proton::Messenger.new() -messenger.incoming_window = 1 -message = Qpid::Proton::Message.new() - -address = ARGV[0] -if not address then - address = "~0.0.0.0" -end -messenger.subscribe(address) - -messenger.start() - -while (true) do - messenger.receive() - messenger.get(message) - print "Got: #{message}\n" - messenger.accept() -end - -messenger.stop() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/smoke/send.php ---------------------------------------------------------------------- diff --git a/tests/smoke/send.php b/tests/smoke/send.php deleted file mode 100755 index e77d89d..0000000 --- a/tests/smoke/send.php +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env php -<?php - -include("proton.php"); - -$messenger = new Messenger(); -$messenger->outgoing_window = 10; -$message = new Message(); - -$address = $argv[1]; -if (!$address) { - $address = "0.0.0.0"; -} - -$message->address = $address; -$message->properties = Array("binding" => "php", - "version" => phpversion()); -$message->body = "Hello World!"; - -$messenger->start(); -$tracker = $messenger->put($message); -print "Put: $message\n"; -$messenger->send(); -print "Status: " . $messenger->status($tracker) . "\n"; -$messenger->stop(); - -?> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/smoke/send.pl ---------------------------------------------------------------------- diff --git a/tests/smoke/send.pl b/tests/smoke/send.pl deleted file mode 100755 index 11c5a04..0000000 --- a/tests/smoke/send.pl +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env perl -require 'qpid_proton.pm'; -my $messenger = qpid::proton::Messenger->new(); -$messenger->set_outgoing_window(10); -my $message = qpid::proton::Message->new(); - -my $address = $ARGV[0]; -$address = "0.0.0.0" if !defined $address; -$message->set_address($address); -# how do we set properties and body? - -$messenger->start(); -my $tracker = $messenger->put($message); -print "Put: $message\n"; -$messenger->send(); -print "Status: ", $messenger->status($tracker), "\n"; -$messenger->stop(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/smoke/send.py ---------------------------------------------------------------------- diff --git a/tests/smoke/send.py b/tests/smoke/send.py deleted file mode 100755 index 7daee01..0000000 --- a/tests/smoke/send.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python -from __future__ import print_function -import sys -from proton import * - -messenger = Messenger() -messenger.outgoing_window = 10 -message = Message() - -address = "0.0.0.0" -if len(sys.argv) > 1: - address = sys.argv[1] - -message.address = address -message.properties = {u"binding": u"python", - u"version": sys.version} -message.body = u"Hello World!" - -messenger.start() -tracker = messenger.put(message) -print("Put: %s" % message) -messenger.send() -print("Status: %s" % messenger.status(tracker)) -messenger.stop() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/smoke/send.rb ---------------------------------------------------------------------- diff --git a/tests/smoke/send.rb b/tests/smoke/send.rb deleted file mode 100755 index c6865af..0000000 --- a/tests/smoke/send.rb +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env ruby - -require 'qpid_proton.rb' - -messenger = Qpid::Proton::Messenger.new() -messenger.outgoing_window = 10 -message = Qpid::Proton::Message.new() - -address = ARGV[0] -if not address then - address = "0.0.0.0" -end - -message.address = address -message.properties = {"binding" => "ruby", - "version" => "#{RUBY_VERSION} #{RUBY_PLATFORM}"} -message.body = "Hello World!" - -messenger.start() -tracker = messenger.put(message) -print "Put: #{message}\n" -messenger.send() -print "Status: ", messenger.status(tracker), "\n" -messenger.stop() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/tools/apps/python/msgr-recv.py ---------------------------------------------------------------------- diff --git a/tests/tools/apps/python/msgr-recv.py b/tests/tools/apps/python/msgr-recv.py deleted file mode 100755 index 757b19c..0000000 --- a/tests/tools/apps/python/msgr-recv.py +++ /dev/null @@ -1,206 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from __future__ import print_function -import sys, optparse, time -import logging -from proton import * - -# Hi python3! -try: - long() -except: - long = int - - -usage = """ -Usage: msgr-recv [OPTIONS] - -a <addr>[,<addr>]* \tAddresses to listen on [amqp://~0.0.0.0] - -c # \tNumber of messages to receive before exiting [0=forever] - -b # \tArgument to Messenger::recv(n) [2048] - -w # \tSize for incoming window [0] - -t # \tInactivity timeout in seconds, -1 = no timeout [-1] - -e # \t# seconds to report statistics, 0 = end of test [0] *TBD* - -R \tSend reply if 'reply-to' present - -W # \t# outgoing window size [0] - -F <addr>[,<addr>]* \tAddresses used for forwarding received messages - -N <name> \tSet the container name to <name> - -X <text> \tPrint '<text>\\n' to stdout after all subscriptions are created - -V \tEnable debug logging""" - - -def parse_options( argv ): - parser = optparse.OptionParser(usage=usage) - parser.add_option("-a", dest="subscriptions", action="append", type="string") - parser.add_option("-c", dest="msg_count", type="int", default=0) - parser.add_option("-b", dest="recv_count", type="int", default=-1) - parser.add_option("-w", dest="incoming_window", type="int") - parser.add_option("-t", dest="timeout", type="int", default=-1) - parser.add_option("-e", dest="report_interval", type="int", default=0) - parser.add_option("-R", dest="reply", action="store_true") - parser.add_option("-W", dest="outgoing_window", type="int") - parser.add_option("-F", dest="forwarding_targets", action="append", type="string") - parser.add_option("-N", dest="name", type="string") - parser.add_option("-X", dest="ready_text", type="string") - parser.add_option("-V", dest="verbose", action="store_true") - - return parser.parse_args(args=argv) - - -class Statistics(object): - def __init__(self): - self.start_time = 0.0 - self.latency_samples = 0 - self.latency_total = 0.0 - self.latency_min = None - self.latency_max = None - - def start(self): - self.start_time = time.time() - - def msg_received(self, msg): - ts = msg.creation_time - if ts: - l = long(time.time() * 1000) - ts - if l > 0.0: - self.latency_total += l - self.latency_samples += 1 - if self.latency_samples == 1: - self.latency_min = self.latency_max = l - else: - if self.latency_min > l: - self.latency_min = l - if self.latency_max < l: - self.latency_max = l - - def report(self, sent, received): - secs = time.time() - self.start_time - print("Messages sent: %d recv: %d" % (sent, received) ) - print("Total time: %f sec" % secs ) - if secs: - print("Throughput: %f msgs/sec" % (sent/secs) ) - if self.latency_samples: - print("Latency (sec): %f min %f max %f avg" % (self.latency_min/1000.0, - self.latency_max/1000.0, - (self.latency_total/self.latency_samples)/1000.0)) - - -def main(argv=None): - opts = parse_options(argv)[0] - if opts.subscriptions is None: - opts.subscriptions = ["amqp://~0.0.0.0"] - stats = Statistics() - sent = 0 - received = 0 - forwarding_index = 0 - - log = logging.getLogger("msgr-recv") - log.addHandler(logging.StreamHandler()) - if opts.verbose: - log.setLevel(logging.DEBUG) - else: - log.setLevel(logging.INFO) - - message = Message() - messenger = Messenger( opts.name ) - - if opts.incoming_window is not None: - messenger.incoming_window = opts.incoming_window - if opts.timeout > 0: - opts.timeout *= 1000 - messenger.timeout = opts.timeout - - messenger.start() - - # unpack addresses that were specified using comma-separated list - - for x in opts.subscriptions: - z = x.split(",") - for y in z: - if y: - log.debug("Subscribing to %s", y) - messenger.subscribe(y) - - forwarding_targets = [] - if opts.forwarding_targets: - for x in opts.forwarding_targets: - z = x.split(",") - for y in z: - if y: - forwarding_targets.append(y) - - # hack to let test scripts know when the receivers are ready (so that the - # senders may be started) - if opts.ready_text: - print("%s" % opts.ready_text) - sys.stdout.flush() - - while opts.msg_count == 0 or received < opts.msg_count: - - log.debug("Calling pn_messenger_recv(%d)", opts.recv_count) - rc = messenger.recv(opts.recv_count) - - # start the timer only after receiving the first msg - if received == 0: - stats.start() - - log.debug("Messages on incoming queue: %d", messenger.incoming) - while messenger.incoming > 0: - messenger.get(message) - received += 1 - # TODO: header decoding? - # uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong; - stats.msg_received( message ) - - if opts.reply: - if message.reply_to: - log.debug("Replying to: %s", message.reply_to ) - message.address = message.reply_to - message.creation_time = long(time.time() * 1000) - messenger.put( message ) - sent += 1 - - if forwarding_targets: - forward_addr = forwarding_targets[forwarding_index] - forwarding_index += 1 - if forwarding_index == len(forwarding_targets): - forwarding_index = 0 - log.debug("Forwarding to: %s", forward_addr ) - message.address = forward_addr - message.reply_to = None - message.creation_time = long(time.time() * 1000) - messenger.put( message ) - sent += 1 - - log.debug("Messages received=%lu sent=%lu", received, sent) - - # this will flush any pending sends - if messenger.outgoing > 0: - log.debug("Calling pn_messenger_send()") - messenger.send() - - messenger.stop() - - stats.report( sent, received ) - return 0 - - -if __name__ == "__main__": - sys.exit(main()) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tests/tools/apps/python/msgr-send.py ---------------------------------------------------------------------- diff --git a/tests/tools/apps/python/msgr-send.py b/tests/tools/apps/python/msgr-send.py deleted file mode 100755 index 2e2583f..0000000 --- a/tests/tools/apps/python/msgr-send.py +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from __future__ import print_function -import sys, optparse, time -import logging -from proton import * - -# Hi python3! -try: - long() -except: - long = int - - -usage = """ -Usage: msgr-send [OPTIONS] - -a <addr>[,<addr>]* \tThe target address [amqp[s]://domain[/name]] - -c # \tNumber of messages to send before exiting [0=forever] - -b # \tSize of message body in bytes [1024] - -p # \tSend batches of # messages (wait for replies before sending next batch if -R) [1024] - -w # \t# outgoing window size [0] - -e # \t# seconds to report statistics, 0 = end of test [0] - -R \tWait for a reply to each sent message - -t # \tInactivity timeout in seconds, -1 = no timeout [-1] - -W # \tIncoming window size [0] - -B # \tArgument to Messenger::recv(n) [-1] - -N <name> \tSet the container name to <name> - -V \tEnable debug logging""" - - -def parse_options( argv ): - parser = optparse.OptionParser(usage=usage) - parser.add_option("-a", dest="targets", action="append", type="string") - parser.add_option("-c", dest="msg_count", type="int", default=0) - parser.add_option("-b", dest="msg_size", type="int", default=1024) - parser.add_option("-p", dest="send_batch", type="int", default=1024) - parser.add_option("-w", dest="outgoing_window", type="int") - parser.add_option("-e", dest="report_interval", type="int", default=0) - parser.add_option("-R", dest="get_replies", action="store_true") - parser.add_option("-t", dest="timeout", type="int", default=-1) - parser.add_option("-W", dest="incoming_window", type="int") - parser.add_option("-B", dest="recv_count", type="int", default=-1) - parser.add_option("-N", dest="name", type="string") - parser.add_option("-V", dest="verbose", action="store_true") - - return parser.parse_args(args=argv) - - -class Statistics(object): - def __init__(self): - self.start_time = 0.0 - self.latency_samples = 0 - self.latency_total = 0.0 - self.latency_min = None - self.latency_max = None - - def start(self): - self.start_time = time.time() - - def msg_received(self, msg): - ts = msg.creation_time - if ts: - l = long(time.time() * 1000) - ts - if l > 0.0: - self.latency_total += l - self.latency_samples += 1 - if self.latency_samples == 1: - self.latency_min = self.latency_max = l - else: - if self.latency_min > l: - self.latency_min = l - if self.latency_max < l: - self.latency_max = l - - def report(self, sent, received): - secs = time.time() - self.start_time - print("Messages sent: %d recv: %d" % (sent, received) ) - print("Total time: %f sec" % secs ) - if secs: - print("Throughput: %f msgs/sec" % (sent/secs) ) - if self.latency_samples: - print("Latency (sec): %f min %f max %f avg" % (self.latency_min/1000.0, - self.latency_max/1000.0, - (self.latency_total/self.latency_samples)/1000.0)) - - - -def process_replies( messenger, message, stats, max_count, log): - """ - Return the # of reply messages received - """ - received = 0 - log.debug("Calling pn_messenger_recv(%d)", max_count) - messenger.recv( max_count ) - log.debug("Messages on incoming queue: %d", messenger.incoming) - while messenger.incoming > 0: - messenger.get( message ) - received += 1 - # TODO: header decoding? - stats.msg_received( message ) - # uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong; - return received - -def main(argv=None): - opts = parse_options(argv)[0] - if opts.targets is None: - opts.targets = ["amqp://0.0.0.0"] - stats = Statistics() - sent = 0 - received = 0 - target_index = 0 - - log = logging.getLogger("msgr-send") - log.addHandler(logging.StreamHandler()) - if opts.verbose: - log.setLevel(logging.DEBUG) - else: - log.setLevel(logging.INFO) - - - message = Message() - message.reply_to = "~" - message.body = "X" * opts.msg_size - reply_message = Message() - messenger = Messenger( opts.name ) - - if opts.outgoing_window is not None: - messenger.outgoing_window = opts.outgoing_window - if opts.timeout > 0: - opts.timeout *= 1000 - messenger.timeout = opts.timeout - - messenger.start() - - # unpack targets that were specified using comma-separated list - # - targets = [] - for x in opts.targets: - z = x.split(",") - for y in z: - if y: - targets.append(y) - - stats.start() - while opts.msg_count == 0 or sent < opts.msg_count: - # send a message - message.address = targets[target_index] - if target_index == len(targets) - 1: - target_index = 0 - else: - target_index += 1 - message.correlation_id = sent - message.creation_time = long(time.time() * 1000) - messenger.put( message ) - sent += 1 - - if opts.send_batch and (messenger.outgoing >= opts.send_batch): - if opts.get_replies: - while received < sent: - # this will also transmit any pending sent messages - received += process_replies( messenger, reply_message, - stats, opts.recv_count, log ) - else: - log.debug("Calling pn_messenger_send()") - messenger.send() - - log.debug("Messages received=%d sent=%d", received, sent) - - if opts.get_replies: - # wait for the last of the replies - while received < sent: - count = process_replies( messenger, reply_message, stats, - opts.recv_count, log ) - received += count - log.debug("Messages received=%d sent=%d", received, sent) - - elif messenger.outgoing > 0: - log.debug("Calling pn_messenger_send()") - messenger.send() - - messenger.stop() - - stats.report( sent, received ) - return 0 - -if __name__ == "__main__": - sys.exit(main()) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tools/cmake/Modules/FindEmscripten.cmake ---------------------------------------------------------------------- diff --git a/tools/cmake/Modules/FindEmscripten.cmake b/tools/cmake/Modules/FindEmscripten.cmake deleted file mode 100644 index 7289731..0000000 --- a/tools/cmake/Modules/FindEmscripten.cmake +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# FindEmscripten -# This module checks if Emscripten and its prerequisites are installed and if so -# sets EMSCRIPTEN_FOUND Emscripten (https://github.com/kripken/emscripten) is a -# C/C++ to JavaScript cross-compiler used to generate the JavaScript bindings. - -if (NOT EMSCRIPTEN_FOUND) - # First check that Node.js is installed as that is needed by Emscripten. - find_program(NODE node) - if (NOT NODE) - message(STATUS "Node.js (http://nodejs.org) is not installed: can't build JavaScript binding") - else (NOT NODE) - # Check that the Emscripten C/C++ to JavaScript cross-compiler is installed. - find_program(EMCC emcc) - if (NOT EMCC) - message(STATUS "Emscripten (https://github.com/kripken/emscripten) is not installed: can't build JavaScript binding") - else (NOT EMCC) - set(EMSCRIPTEN_FOUND ON) - endif (NOT EMCC) - endif (NOT NODE) -endif (NOT EMSCRIPTEN_FOUND) - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tools/cmake/Modules/FindNodePackages.cmake ---------------------------------------------------------------------- diff --git a/tools/cmake/Modules/FindNodePackages.cmake b/tools/cmake/Modules/FindNodePackages.cmake deleted file mode 100644 index f6c0e49..0000000 --- a/tools/cmake/Modules/FindNodePackages.cmake +++ /dev/null @@ -1,72 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# FindNodePackages -# This module finds and installs (using npm) node.js packages that are used by -# the JavaScript binding. The binding should still compile if these packages -# cannot be installed but certain features might not work as described below. -# -# * The ws package is the WebSocket library used by emscripten when the target is -# node.js, it isn't needed for applications hosted on a browser (where native -# WebSockets will be used), but without it it won't work with node.js. -# -# * The jsdoc package is a JavaScript API document generator analogous to Doxygen -# or JavaDoc, it is used by the docs target in the JavaScript binding. - -if (NOT NODE_PACKAGES_FOUND) - # Check if the specified node.js package is already installed, if not install it. - macro(InstallPackage varname name) - execute_process( - COMMAND npm list --local ${name} - OUTPUT_VARIABLE check_package - ) - - set(${varname} OFF) - - if (check_package MATCHES "${name}@.") - message(STATUS "Found node.js package: ${name}") - set(${varname} ON) - else() - message(STATUS "Installing node.js package: ${name}") - - execute_process( - COMMAND npm install ${name} - WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} - OUTPUT_VARIABLE var - ) - - if (var) - message(STATUS "Installed node.js package: ${name}") - set(${varname} ON) - endif (var) - endif() - endmacro() - - # Check if ws WebSocket library https://github.com/websockets/ws is installed - # N.B. something changed between ws 0.5.0 and 0.6.0 that breaks proton js - # so explicitly pulling version 0.5.0 - # TODO update javascript binding/emscripten/both to work with latest ws. - InstallPackage("NODE_WS_FOUND" "[email protected]") - - # Check if jsdoc3 API documentation generator https://github.com/jsdoc3/jsdoc is installed - InstallPackage("NODE_JSDOC_FOUND" "jsdoc") - - set(NODE_PACKAGES_FOUND ON) -endif (NOT NODE_PACKAGES_FOUND) - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c9bb9ff/tools/cmake/Modules/ProtonFindPerl.cmake ---------------------------------------------------------------------- diff --git a/tools/cmake/Modules/ProtonFindPerl.cmake b/tools/cmake/Modules/ProtonFindPerl.cmake deleted file mode 100644 index e2f3fef..0000000 --- a/tools/cmake/Modules/ProtonFindPerl.cmake +++ /dev/null @@ -1,81 +0,0 @@ -# - Find Perl Libraries -# This module searches for Perl libraries in the event that those files aren't -# found by the default Cmake module. - -# include(${CMAKE_CURRENT_LIST_DIR}/FindPerlLibs.cmake) - -include(FindPerl) -include(FindPerlLibs) - -if(NOT PERLLIBS_FOUND) - MESSAGE ( STATUS "Trying alternative search for Perl" ) - - # taken from Cmake 2.8 FindPerlLibs.cmake - EXECUTE_PROCESS ( COMMAND ${PERL_EXECUTABLE} - -V:installarchlib - OUTPUT_VARIABLE PERL_ARCHLIB_OUTPUT_VARIABLE - RESULT_VARIABLE PERL_ARCHLIB_RESULT_VARIABLE ) - - if (NOT PERL_ARCHLIB_RESULT_VARIABLE) - string(REGEX REPLACE "install[a-z]+='([^']+)'.*" "\\1" PERL_ARCHLIB ${PERL_ARCHLIB_OUTPUT_VARIABLE}) - file(TO_CMAKE_PATH "${PERL_ARCHLIB}" PERL_ARCHLIB) - endif ( NOT PERL_ARCHLIB_RESULT_VARIABLE ) - - EXECUTE_PROCESS ( COMMAND ${PERL_EXECUTABLE} - -MConfig -e "print \$Config{archlibexp}" - OUTPUT_VARIABLE PERL_OUTPUT - RESULT_VARIABLE PERL_RETURN_VALUE ) - - IF ( NOT PERL_RETURN_VALUE ) - FIND_PATH ( PERL_INCLUDE_PATH perl.h ${PERL_OUTPUT}/CORE ) - - IF (PERL_INCLUDE_PATH MATCHES .*-NOTFOUND OR NOT PERL_INCLUDE_PATH) - MESSAGE(STATUS "Could not find perl.h") - ENDIF () - - ENDIF ( NOT PERL_RETURN_VALUE ) - - # if either the library path is not found not set at all - # then do our own search - if ( NOT PERL_LIBRARY ) - EXECUTE_PROCESS( COMMAND ${PERL_EXECUTABLE} -V:libperl - OUTPUT_VARIABLE PERL_LIBRARY_OUTPUT - RESULT_VARIABLE PERL_LIBRARY_RESULT ) - - IF ( NOT PERL_LIBRARY_RESULT ) - string(REGEX REPLACE "libperl='([^']+)'.*" "\\1" PERL_POSSIBLE_LIBRARIES ${PERL_LIBRARY_OUTPUT}) - ENDIF ( NOT PERL_LIBRARY_RESULT ) - - MESSAGE ( STATUS "Looking for ${PERL_POSSIBLE_LIBRARIES}" ) - - find_file(PERL_LIBRARY - NAMES ${PERL_POSSIBLE_LIBRARIES} - PATHS /usr/lib - ${PERL_ARCHLIB}/CORE - ) - - endif ( NOT PERL_LIBRARY ) - - IF ( PERL_LIBRARY MATCHES .*-NOTFOUND OR NOT PERL_LIBRARY ) - EXECUTE_PROCESS ( COMMAND ${PERL_EXECUTABLE} - -MConfig -e "print \$Config{libperl}" - OUTPUT_VARIABLE PERL_OUTPUT - RESULT_VARIABLE PERL_RETURN_VALUE ) - - IF ( NOT PERL_RETURN_VALUE ) - FIND_LIBRARY ( PERL_LIBRARY NAMES ${PERL_OUTPUT} - PATHS ${PERL_INCLUDE_PATH} ) - - ENDIF ( NOT PERL_RETURN_VALUE ) - ENDIF ( PERL_LIBRARY MATCHES .*-NOTFOUND OR NOT PERL_LIBRARY ) - - IF(PERL_LIBRARY MATCHES .*-NOTFOUND OR NOT PERL_LIBRARY OR - PERL_INCLUDE_PATH MATCHES .*-NOTFOUND OR NOT PERL_INCLUDE_PATH) - MESSAGE (STATUS "No Perl devel environment found - skipping Perl bindings") - SET (DEFAULT_PERL OFF) - ELSE() - MESSAGE ( STATUS "Found PerlLibs: ${PERL_LIBRARY}" ) - SET (DEFAULT_PERL ON) - ENDIF() - -endif(NOT PERLLIBS_FOUND) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
