PROTON-881: Add reactor interop tests that send messages from proton-c to proton-j
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1eb41f60 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1eb41f60 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1eb41f60 Branch: refs/heads/proton-j-reactor Commit: 1eb41f603b0a4c5da9c686af1369837e7c6f2184 Parents: 7faa7e2 Author: Adrian Preston <[email protected]> Authored: Fri May 1 15:36:00 2015 +0100 Committer: Adrian Preston <[email protected]> Committed: Wed May 6 23:24:28 2015 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/proton/ProtonJInterop.java | 76 +++++++++++++++++--- tests/python/proton_tests/reactor_interop.py | 76 ++++++++++++++++++-- 2 files changed, 138 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1eb41f60/tests/java/org/apache/qpid/proton/ProtonJInterop.java ---------------------------------------------------------------------- diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java index 678bfd2..31306ef 100644 --- a/tests/java/org/apache/qpid/proton/ProtonJInterop.java +++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java @@ -31,9 +31,12 @@ import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.reactor.Acceptor; +import org.apache.qpid.proton.reactor.FlowController; import org.apache.qpid.proton.reactor.Handshaker; import org.apache.qpid.proton.reactor.Reactor; @@ -123,18 +126,75 @@ public class ProtonJInterop { } } - private static boolean sendTest(String[] args) throws IOException { - int port = Integer.valueOf(args[0]); - int numMsgs = Integer.valueOf(args[1]); - Send send = new Send("localhost:" + port, numMsgs); - Reactor r = Proton.reactor(send); - r.run(); - return send.getResult(); + private static class Recv extends BaseHandler { + private final int port; + private final int numMsgs; + private int count = 0; + private Acceptor acceptor = null; + + private Recv(int port, int numMsgs) { + this.port = port; + this.numMsgs = numMsgs; + add(new Handshaker()); + add(new FlowController()); + } + + @Override + public void onReactorInit(Event event) { + try { + acceptor = event.getReactor().acceptor("localhost", port); + } catch(IOException ioException) { + throw new RuntimeException(ioException); + } + } + + @Override + public void onDelivery(Event event) { + Receiver recv = (Receiver)event.getLink(); + Delivery delivery = recv.current(); + if (delivery.isReadable() && !delivery.isPartial()) { + int size = delivery.pending(); + byte[] buffer = new byte[size]; + int read = recv.recv(buffer, 0, buffer.length); + recv.advance(); + + Message msg = Proton.message(); + msg.decode(buffer, 0, read); + + ++count; + String msgBody = ((AmqpValue)msg.getBody()).getValue().toString(); + String expected = "message-" + count; + if (!expected.equals(msgBody)) { + throw new RuntimeException("Received message body '" + msgBody + "', expected: '" + expected + "'"); + } + + if (count == numMsgs) { + recv.close(); + recv.getSession().close(); + recv.getSession().getConnection().close(); + acceptor.close(); + } + } + } } public static void main(String[] args) throws IOException { try { - System.exit(sendTest(args) ? 0 : 1); + int port = Integer.valueOf(args[1]); + int numMsgs = Integer.valueOf(args[2]); + boolean result = false; + + if ("send".equalsIgnoreCase(args[0])) { + Send send = new Send("localhost:" + port, numMsgs); + Reactor r = Proton.reactor(send); + r.run(); + result = send.getResult(); + } else { + Reactor r = Proton.reactor(new Recv(port, numMsgs)); + r.run(); + result = true; + } + System.exit(result ? 0 : 1); } catch(Throwable t) { t.printStackTrace(); System.exit(1); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1eb41f60/tests/python/proton_tests/reactor_interop.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/reactor_interop.py b/tests/python/proton_tests/reactor_interop.py index b2604db..cbb75e6 100644 --- a/tests/python/proton_tests/reactor_interop.py +++ b/tests/python/proton_tests/reactor_interop.py @@ -20,16 +20,18 @@ from common import Test, free_tcp_port from proton import Message -from proton.reactor import Reactor from proton.handlers import CHandshaker, CFlowController +from proton.reactor import Reactor -import subprocess import os +import subprocess from threading import Thread +import time -class JavaSendThread(Thread): - def __init__(self, port, count): +class JavaThread(Thread): + def __init__(self, operation, port, count): Thread.__init__(self) + self.operation = operation self.port = str(port) self.count = str(count) self.result = 1 @@ -37,7 +39,7 @@ class JavaSendThread(Thread): def run(self): self.result = subprocess.call(['java', 'org.apache.qpid.proton.ProtonJInterop', - self.port, self.count]) + self.operation, self.port, self.count]) class ReceiveHandler: def __init__(self, count): @@ -48,7 +50,7 @@ class ReceiveHandler: def on_reactor_init(self, event): port = free_tcp_port() self.acceptor = event.reactor.acceptor("localhost", port) - self.java_thread = JavaSendThread(port, self.count) + self.java_thread = JavaThread("send", port, self.count) self.java_thread.start() def on_delivery(self, event): @@ -61,6 +63,37 @@ class ReceiveHandler: if (self.count == 0): self.acceptor.close() +class SendHandler: + def __init__(self, host, num_msgs): + self.host = host + self.num_msgs = num_msgs + self.count = 0 + self.handlers = [CHandshaker()] + + def on_connection_init(self, event): + conn = event.connection + conn.hostname = self.host + ssn = conn.session() + snd = ssn.sender("sender") + conn.open() + ssn.open() + snd.open() + + def on_link_flow(self, event): + snd = event.sender + if snd.credit > 0 and self.count < self.num_msgs: + self.count += 1 + msg = Message("message-" + str(self.count)) + dlv = snd.send(msg) + dlv.settle() + if (self.count == self.num_msgs): + snd.close() + snd.session.close() + snd.connection.close() + + def on_reactor_init(self, event): + event.reactor.connection(self) + class ReactorInteropTest(Test): def setup(self): @@ -72,6 +105,24 @@ class ReactorInteropTest(Test): for entry in entries: self.proton_j_available |= os.path.exists(entry) + def protonc_to_protonj(self, count): + if (not self.proton_j_available): + raise Skip() + + port = free_tcp_port() + java_thread = JavaThread("recv", port, count) + java_thread.start() + # Give the Java thread time to spin up a JVM and start listening + # XXX: would be better to parse the stdout output for a message + time.sleep(1) + + sh = SendHandler('localhost:' + str(port), count) + r = Reactor(sh) + r.run() + + java_thread.join() + assert(java_thread.result == 0) + def protonj_to_protonc(self, count): if (not self.proton_j_available): raise Skip() @@ -86,6 +137,18 @@ class ReactorInteropTest(Test): for i in range(1, count): assert(rh.messages[i-1] == ("message-" + str(i))) + def test_protonc_to_protonj_1(self): + self.protonc_to_protonj(1) + + def test_protonc_to_protonj_5(self): + self.protonc_to_protonj(5) + + def test_protonc_to_protonj_500(self): + self.protonc_to_protonj(500) + + def test_protonc_to_protonj_5000(self): + self.protonc_to_protonj(5000) + def test_protonj_to_protonc_1(self): self.protonj_to_protonc(1) @@ -97,3 +160,4 @@ class ReactorInteropTest(Test): def test_protonj_to_protonc_5000(self): self.protonj_to_protonc(5000) + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
