PROTON-881: Tidy up proton-j to proton-c reactor interop tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2e6f5cdd Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2e6f5cdd Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2e6f5cdd Branch: refs/heads/proton-j-reactor Commit: 2e6f5cdd1754b266e81afbc49ae4333a75287d57 Parents: b6e18b5 Author: Adrian Preston <[email protected]> Authored: Thu Apr 30 13:58:28 2015 +0100 Committer: Adrian Preston <[email protected]> Committed: Wed May 6 23:24:11 2015 +0100 ---------------------------------------------------------------------- .../qpid/proton/example/reactor/Send.java | 2 +- .../org/apache/qpid/proton/ProtonJInterop.java | 78 ++++++++++---------- tests/python/proton_tests/reactor_interop.py | 57 ++++++++++---- 3 files changed, 81 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java index 5cd5811..22da720 100644 --- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java @@ -78,7 +78,7 @@ public class Send extends BaseHandler { @Override public void onLinkFlow(Event event) { Sender snd = (Sender)event.getLink(); - if (snd.getCredit() > 0 && message != null) { + if (snd.getCredit() > 0) { byte[] msgData = new byte[1024]; int length; while(true) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/tests/java/org/apache/qpid/proton/ProtonJInterop.java ---------------------------------------------------------------------- diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java index 8b49508..678bfd2 100644 --- a/tests/java/org/apache/qpid/proton/ProtonJInterop.java +++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java @@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; @@ -36,22 +37,18 @@ import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.reactor.Handshaker; import org.apache.qpid.proton.reactor.Reactor; -public class ProtonJInterop { // TODO: this doesn't return a useful RC - +public class ProtonJInterop { private static class SendHandler extends BaseHandler { private final String hostname; - private final Message message; - private int nextTag = 0; - private int result = 1; + private int numMsgs; + private int count = 0; + private boolean result = false; - private SendHandler(String hostname, Message message) { + private SendHandler(String hostname, int numMsgs) { this.hostname = hostname; - this.message = message; - - // Add a child handler that performs some default handshaking - // behaviour. + this.numMsgs = numMsgs; add(new Handshaker()); } @@ -59,16 +56,7 @@ public class ProtonJInterop { // TODO: this doesn't return a useful RC public void onConnectionInit(Event event) { Connection conn = event.getConnection(); conn.setHostname(hostname); - - // Every session or link could have their own handler(s) if we - // wanted simply by adding the handler to the given session - // or link Session ssn = conn.session(); - - // If a link doesn't have an event handler, the events go to - // its parent session. If the session doesn't have a handler - // the events go to its parent connection. If the connection - // doesn't have a handler, the events go to the reactor. Sender snd = ssn.sender("sender"); conn.open(); ssn.open(); @@ -78,7 +66,10 @@ public class ProtonJInterop { // TODO: this doesn't return a useful RC @Override public void onLinkFlow(Event event) { Sender snd = (Sender)event.getLink(); - if (snd.getCredit() > 0 && message != null) { + if (snd.getCredit() > 0 && snd.getLocalState() != EndpointState.CLOSED) { + Message message = Proton.message(); + ++count; + message.setBody(new AmqpValue("message-"+count)); byte[] msgData = new byte[1024]; int length; while(true) { @@ -89,20 +80,23 @@ public class ProtonJInterop { // TODO: this doesn't return a useful RC msgData = new byte[msgData.length * 2]; } } - byte[] tag = String.valueOf(nextTag++).getBytes(); + byte[] tag = String.valueOf(count).getBytes(); Delivery dlv = snd.delivery(tag); snd.send(msgData, 0, length); dlv.settle(); snd.advance(); - snd.close(); - snd.getSession().close(); - snd.getSession().getConnection().close(); - result = 0; + if (count == numMsgs) { + snd.close(); + snd.getSession().close(); + snd.getSession().getConnection().close(); + result = true; + } } } @Override public void onTransportError(Event event) { + result = false; ErrorCondition condition = event.getTransport().getCondition(); if (condition != null) { System.err.println("Error: " + condition.getDescription()); @@ -113,33 +107,37 @@ public class ProtonJInterop { // TODO: this doesn't return a useful RC } private static class Send extends BaseHandler { - private final String hostname; - private final Message message; + private final SendHandler sendHandler; - private Send(String hostname, String content) { - this.hostname = hostname; - message = Proton.message(); - message.setBody(new AmqpValue(content)); + private Send(String hostname, int numMsgs) { + sendHandler = new SendHandler(hostname, numMsgs); } @Override public void onReactorInit(Event event) { - // You can use the connection method to create AMQP connections. + event.getReactor().connection(sendHandler); + } - // This connection's handler is the SendHandler object. All the events - // for this connection will go to the SendHandler object instead of - // going to the reactor. If you were to omit the SendHandler object, - // all the events would go to the reactor. - event.getReactor().connection(new SendHandler(hostname, message)); + public boolean getResult() { + return sendHandler.result; } } - private static void sendTest() throws IOException { - Reactor r = Proton.reactor(new Send("localhost:56789", "test1")); + private static boolean sendTest(String[] args) throws IOException { + int port = Integer.valueOf(args[0]); + int numMsgs = Integer.valueOf(args[1]); + Send send = new Send("localhost:" + port, numMsgs); + Reactor r = Proton.reactor(send); r.run(); + return send.getResult(); } public static void main(String[] args) throws IOException { - sendTest(); + try { + System.exit(sendTest(args) ? 0 : 1); + } catch(Throwable t) { + t.printStackTrace(); + System.exit(1); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/tests/python/proton_tests/reactor_interop.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/reactor_interop.py b/tests/python/proton_tests/reactor_interop.py index 1fdfcd7..9c58d03 100644 --- a/tests/python/proton_tests/reactor_interop.py +++ b/tests/python/proton_tests/reactor_interop.py @@ -18,7 +18,7 @@ # under the License. # -from common import Test +from common import Test, free_tcp_port from proton import Message from proton.reactor import Reactor from proton.handlers import CHandshaker, CFlowController @@ -28,33 +28,60 @@ import os from threading import Thread class JavaSendThread(Thread): - def __init__(self): + def __init__(self, port, count): Thread.__init__(self) + self.port = str(port) + self.count = str(count) + self.result = 1 def run(self): - subprocess.check_output(['java', 'org.apache.qpid.proton.ProtonJInterop']) + self.result = subprocess.call(['java', + 'org.apache.qpid.proton.ProtonJInterop', + self.port, self.count]) - -class Receive: - def __init__(self): +class ReceiveHandler: + def __init__(self, count): + self.count = count self.handlers = [CHandshaker(), CFlowController()] - self.message = Message() + self.messages = [] def on_reactor_init(self, event): - self.acceptor = event.reactor.acceptor("localhost", 56789) - JavaSendThread().start() + port = free_tcp_port() + self.acceptor = event.reactor.acceptor("localhost", port) + self.java_thread = JavaSendThread(port, self.count) + self.java_thread.start() def on_delivery(self, event): rcv = event.receiver - if rcv and self.message.recv(rcv): + msg = Message() + if rcv and msg.recv(rcv): event.delivery.settle() - self.acceptor.close() + self.messages += [msg.body] + self.count -= 1 + if (self.count == 0): + self.acceptor.close() class ReactorInteropTest(Test): - def test_protonj_to_protonc(self): - rcv = Receive() - r = Reactor(rcv) + def protonj_to_protonc(self, count): + rh = ReceiveHandler(count) + r = Reactor(rh) r.run() - assert(rcv.message.body == "test1") + rh.java_thread.join() + assert(rh.java_thread.result == 0) + + for i in range(1, count): + assert(rh.messages[i-1] == ("message-" + str(i))) + + def test_protonj_to_protonc_1(self): + self.protonj_to_protonc(1) + + def test_protonj_to_protonc_5(self): + self.protonj_to_protonc(5) + + def test_protonj_to_protonc_500(self): + self.protonj_to_protonc(500) + + def test_protonj_to_protonc_5000(self): + self.protonj_to_protonc(5000) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
