PROTON-881: Rough first pass at a proton-j -> proton-c interop. test. This adds a Python test case which starts both a Python (proton-c) reactor and also spawns a JVM running a (proton-j) reactor. The expected outcome is that the proton-j reactor is able to send a message to the proton-c reactor.
Right now, there are a lot of rough edges on both the Pyton and Java sides of this test case. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b6e18b5a Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b6e18b5a Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b6e18b5a Branch: refs/heads/proton-j-reactor Commit: b6e18b5a35da5865fe0ae5fd9e161a04170e9750 Parents: 0ac98e7 Author: Adrian Preston <[email protected]> Authored: Wed Apr 29 23:57:47 2015 +0100 Committer: Adrian Preston <[email protected]> Committed: Wed May 6 23:24:03 2015 +0100 ---------------------------------------------------------------------- config.sh.in | 2 +- .../org/apache/qpid/proton/ProtonJInterop.java | 145 +++++++++++++++++++ tests/java/pythonTests.ignore | 1 + tests/python/proton_tests/__init__.py | 1 + tests/python/proton_tests/reactor_interop.py | 60 ++++++++ 5 files changed, 208 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b6e18b5a/config.sh.in ---------------------------------------------------------------------- diff --git a/config.sh.in b/config.sh.in index 4b60b2f..3d74916 100755 --- a/config.sh.in +++ b/config.sh.in @@ -33,7 +33,7 @@ PERL_BINDINGS=$PROTON_BINDINGS/perl COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/src/main/resources:$PROTON_JARS -export CLASSPATH=$PROTON_JARS +export CLASSPATH=$PROTON_JARS:$PROTON_HOME/tests/target/test-classes # PHP if [ -d $PHP_BINDINGS ]; then http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b6e18b5a/tests/java/org/apache/qpid/proton/ProtonJInterop.java ---------------------------------------------------------------------- diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java new file mode 100644 index 0000000..8b49508 --- /dev/null +++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton; + +import java.io.IOException; +import java.nio.BufferOverflowException; + +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.reactor.Handshaker; +import org.apache.qpid.proton.reactor.Reactor; + +public class ProtonJInterop { // TODO: this doesn't return a useful RC + + + private static class SendHandler extends BaseHandler { + + private final String hostname; + private final Message message; + private int nextTag = 0; + private int result = 1; + + private SendHandler(String hostname, Message message) { + this.hostname = hostname; + this.message = message; + + // Add a child handler that performs some default handshaking + // behaviour. + add(new Handshaker()); + } + + @Override + public void onConnectionInit(Event event) { + Connection conn = event.getConnection(); + conn.setHostname(hostname); + + // Every session or link could have their own handler(s) if we + // wanted simply by adding the handler to the given session + // or link + Session ssn = conn.session(); + + // If a link doesn't have an event handler, the events go to + // its parent session. If the session doesn't have a handler + // the events go to its parent connection. If the connection + // doesn't have a handler, the events go to the reactor. + Sender snd = ssn.sender("sender"); + conn.open(); + ssn.open(); + snd.open(); + } + + @Override + public void onLinkFlow(Event event) { + Sender snd = (Sender)event.getLink(); + if (snd.getCredit() > 0 && message != null) { + byte[] msgData = new byte[1024]; + int length; + while(true) { + try { + length = message.encode(msgData, 0, msgData.length); + break; + } catch(BufferOverflowException e) { + msgData = new byte[msgData.length * 2]; + } + } + byte[] tag = String.valueOf(nextTag++).getBytes(); + Delivery dlv = snd.delivery(tag); + snd.send(msgData, 0, length); + dlv.settle(); + snd.advance(); + snd.close(); + snd.getSession().close(); + snd.getSession().getConnection().close(); + result = 0; + } + } + + @Override + public void onTransportError(Event event) { + ErrorCondition condition = event.getTransport().getCondition(); + if (condition != null) { + System.err.println("Error: " + condition.getDescription()); + } else { + System.err.println("Error (no description returned)."); + } + } + } + + private static class Send extends BaseHandler { + private final String hostname; + private final Message message; + + private Send(String hostname, String content) { + this.hostname = hostname; + message = Proton.message(); + message.setBody(new AmqpValue(content)); + } + + @Override + public void onReactorInit(Event event) { + // You can use the connection method to create AMQP connections. + + // This connection's handler is the SendHandler object. All the events + // for this connection will go to the SendHandler object instead of + // going to the reactor. If you were to omit the SendHandler object, + // all the events would go to the reactor. + event.getReactor().connection(new SendHandler(hostname, message)); + } + } + + private static void sendTest() throws IOException { + Reactor r = Proton.reactor(new Send("localhost:56789", "test1")); + r.run(); + } + + public static void main(String[] args) throws IOException { + sendTest(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b6e18b5a/tests/java/pythonTests.ignore ---------------------------------------------------------------------- diff --git a/tests/java/pythonTests.ignore b/tests/java/pythonTests.ignore index a92d732..7911176 100644 --- a/tests/java/pythonTests.ignore +++ b/tests/java/pythonTests.ignore @@ -1,3 +1,4 @@ +proton_tests.reactor_interop.* proton_tests.soak.* proton_tests.ssl.SslTest.test_defaults_messenger_app proton_tests.ssl.SslTest.test_server_authentication_messenger_app http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b6e18b5a/tests/python/proton_tests/__init__.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/__init__.py b/tests/python/proton_tests/__init__.py index 8416ced..5f270cc 100644 --- a/tests/python/proton_tests/__init__.py +++ b/tests/python/proton_tests/__init__.py @@ -21,6 +21,7 @@ import proton_tests.codec import proton_tests.engine import proton_tests.message import proton_tests.reactor +import proton_tests.reactor_interop import proton_tests.messenger import proton_tests.sasl import proton_tests.transport http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b6e18b5a/tests/python/proton_tests/reactor_interop.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/reactor_interop.py b/tests/python/proton_tests/reactor_interop.py new file mode 100644 index 0000000..1fdfcd7 --- /dev/null +++ b/tests/python/proton_tests/reactor_interop.py @@ -0,0 +1,60 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from common import Test +from proton import Message +from proton.reactor import Reactor +from proton.handlers import CHandshaker, CFlowController + +import subprocess +import os +from threading import Thread + +class JavaSendThread(Thread): + def __init__(self): + Thread.__init__(self) + + def run(self): + subprocess.check_output(['java', 'org.apache.qpid.proton.ProtonJInterop']) + + +class Receive: + def __init__(self): + self.handlers = [CHandshaker(), CFlowController()] + self.message = Message() + + def on_reactor_init(self, event): + self.acceptor = event.reactor.acceptor("localhost", 56789) + JavaSendThread().start() + + def on_delivery(self, event): + rcv = event.receiver + if rcv and self.message.recv(rcv): + event.delivery.settle() + self.acceptor.close() + +class ReactorInteropTest(Test): + + def test_protonj_to_protonc(self): + rcv = Receive() + r = Reactor(rcv) + r.run() + assert(rcv.message.body == "test1") + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
