Repository: qpid-proton Updated Branches: refs/heads/master 6ea002d27 -> 619c5c7ff
PROTON-1133: Reactor should not use the connection hostname as a transport address. This patch introduces a new reactor API for setting the host address for a connection created via the reactor. This API is intended to replace the existing semantics where the connection's host address is derived via the connection's hostname setting. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/619c5c7f Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/619c5c7f Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/619c5c7f Branch: refs/heads/master Commit: 619c5c7ff6f155468f472b66a60021db72af8ecf Parents: 6ea002d Author: Ken Giusti <[email protected]> Authored: Sat Mar 26 20:13:59 2016 -0400 Committer: Ken Giusti <[email protected]> Committed: Tue Apr 12 10:54:28 2016 -0400 ---------------------------------------------------------------------- examples/c/reactor/receiver.c | 32 +++++- examples/c/reactor/sender.c | 33 +++++- .../qpid/proton/example/reactor/Send.java | 27 +++-- examples/python/reactor/send.py | 20 ++-- examples/python/reactor/tornado-send.py | 20 ++-- proton-c/bindings/python/proton/__init__.py | 9 +- proton-c/bindings/python/proton/reactor.py | 47 +++++++-- proton-c/include/proton/connection.h | 11 +- proton-c/include/proton/reactor.h | 72 ++++++++++++- proton-c/src/reactor/connection.c | 100 ++++++++++++++++--- proton-c/src/tests/reactor.c | 33 +++++- .../apache/qpid/proton/engine/Connection.java | 11 ++ .../org/apache/qpid/proton/reactor/Reactor.java | 57 +++++++++-- .../qpid/proton/reactor/impl/IOHandler.java | 33 ++++-- .../qpid/proton/reactor/impl/ReactorImpl.java | 36 +++++++ .../apache/qpid/proton/reactor/ReactorTest.java | 21 ++-- .../org/apache/qpid/proton/ProtonJInterop.java | 20 ++-- tests/java/shim/creactor.py | 7 ++ 18 files changed, 493 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/c/reactor/receiver.c ---------------------------------------------------------------------- diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c index dd6a9e3..0948569 100644 --- a/examples/c/reactor/receiver.c +++ b/examples/c/reactor/receiver.c @@ -32,6 +32,8 @@ #include "proton/delivery.h" #include "proton/event.h" #include "proton/handlers.h" +#include "proton/transport.h" +#include "proton/url.h" static int quiet = 0; @@ -148,6 +150,23 @@ static void event_handler(pn_handler_t *handler, } } break; + case PN_TRANSPORT_ERROR: { + // The connection to the peer failed. + // + pn_transport_t *tport = pn_event_transport(event); + pn_condition_t *cond = pn_transport_condition(tport); + fprintf(stderr, "Network transport failed!\n"); + if (pn_condition_is_set(cond)) { + const char *name = pn_condition_get_name(cond); + const char *desc = pn_condition_get_description(cond); + fprintf(stderr, " Error: %s Description: %s\n", + (name) ? name : "<error name not provided>", + (desc) ? desc : "<no description provided>"); + } + // pn_reactor_process() will exit with a false return value, stopping + // the main loop. + } break; + default: break; } @@ -211,11 +230,20 @@ int main(int argc, char** argv) } pn_reactor_t *reactor = pn_reactor(); - pn_connection_t *conn = pn_reactor_connection(reactor, handler); + + pn_url_t *url = pn_url_parse(address); + if (url == NULL) { + fprintf(stderr, "Invalid host address %s\n", address); + exit(1); + } + pn_connection_t *conn = pn_reactor_connection_to_host(reactor, + pn_url_get_host(url), + pn_url_get_port(url), + handler); + pn_decref(url); // the container name should be unique for each client pn_connection_set_container(conn, container); - pn_connection_set_hostname(conn, address); // FIXME // wait up to 5 seconds for activity before returning from // pn_reactor_process() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/c/reactor/sender.c ---------------------------------------------------------------------- diff --git a/examples/c/reactor/sender.c b/examples/c/reactor/sender.c index 9a07c9e..e1f73dc 100644 --- a/examples/c/reactor/sender.c +++ b/examples/c/reactor/sender.c @@ -32,6 +32,9 @@ #include "proton/delivery.h" #include "proton/event.h" #include "proton/handlers.h" +#include "proton/transport.h" +#include "proton/url.h" + static int quiet = 0; @@ -157,6 +160,23 @@ static void event_handler(pn_handler_t *handler, } } break; + case PN_TRANSPORT_ERROR: { + // The connection to the peer failed. + // + pn_transport_t *tport = pn_event_transport(event); + pn_condition_t *cond = pn_transport_condition(tport); + fprintf(stderr, "Network transport failed!\n"); + if (pn_condition_is_set(cond)) { + const char *name = pn_condition_get_name(cond); + const char *desc = pn_condition_get_description(cond); + fprintf(stderr, " Error: %s Description: %s\n", + (name) ? name : "<error name not provided>", + (desc) ? desc : "<no description provided>"); + } + // pn_reactor_process() will exit with a false return value, stopping + // the main loop. + } break; + default: break; } @@ -260,11 +280,20 @@ int main(int argc, char** argv) pn_decref(message); // message no longer needed pn_reactor_t *reactor = pn_reactor(); - pn_connection_t *conn = pn_reactor_connection(reactor, handler); + + pn_url_t *url = pn_url_parse(address); + if (url == NULL) { + fprintf(stderr, "Invalid host address %s\n", address); + exit(1); + } + pn_connection_t *conn = pn_reactor_connection_to_host(reactor, + pn_url_get_host(url), + pn_url_get_port(url), + handler); + pn_decref(url); // the container name should be unique for each client pn_connection_set_container(conn, container); - pn_connection_set_hostname(conn, address); // FIXME // wait up to 5 seconds for activity before returning from // pn_reactor_process() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java index 22da720..5978c45 100644 --- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java @@ -42,12 +42,10 @@ public class Send extends BaseHandler { private class SendHandler extends BaseHandler { - private final String hostname; private final Message message; private int nextTag = 0; - private SendHandler(String hostname, Message message) { - this.hostname = hostname; + private SendHandler(Message message) { this.message = message; // Add a child handler that performs some default handshaking @@ -58,7 +56,6 @@ public class Send extends BaseHandler { @Override public void onConnectionInit(Event event) { Connection conn = event.getConnection(); - conn.setHostname(hostname); // Every session or link could have their own handler(s) if we // wanted simply by adding the handler to the given session @@ -111,11 +108,13 @@ public class Send extends BaseHandler { } } - private final String hostname; + private final String host; + private final int port; private final Message message; - private Send(String hostname, String content) { - this.hostname = hostname; + private Send(String host, int port, String content) { + this.host = host; + this.port = port; message = Proton.message(); message.setBody(new AmqpValue(content)); } @@ -128,14 +127,22 @@ public class Send extends BaseHandler { // for this connection will go to the SendHandler object instead of // going to the reactor. If you were to omit the SendHandler object, // all the events would go to the reactor. - event.getReactor().connection(new SendHandler(hostname, message)); + event.getReactor().connectionToHost(host, port, new SendHandler(message)); } public static void main(String[] args) throws IOException { - String hostname = args.length > 0 ? args[0] : "localhost"; + int port = 5672; + String host = "localhost"; + if (args.length > 0) { + String[] parts = args[0].split(":", 2); + host = parts[0]; + if (parts.length > 1) { + port = Integer.parseInt(parts[1]); + } + } String content = args.length > 1 ? args[1] : "Hello World!"; - Reactor r = Proton.reactor(new Send(hostname, content)); + Reactor r = Proton.reactor(new Send(host, port, content)); r.run(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/python/reactor/send.py ---------------------------------------------------------------------- diff --git a/examples/python/reactor/send.py b/examples/python/reactor/send.py index c718780..4356da1 100755 --- a/examples/python/reactor/send.py +++ b/examples/python/reactor/send.py @@ -19,7 +19,7 @@ # import sys -from proton import Message +from proton import Message, Url from proton.reactor import Reactor from proton.handlers import CHandshaker @@ -30,16 +30,15 @@ from proton.handlers import CHandshaker class Send: - def __init__(self, host, message): - self.host = host + def __init__(self, message, target): self.message = message + self.target = target if target is not None else "examples" # Use the handlers property to add some default handshaking # behaviour. self.handlers = [CHandshaker()] def on_connection_init(self, event): conn = event.connection - conn.hostname = self.host # Every session or link could have their own handler(s) if we # wanted simply by setting the "handler" slot on the @@ -51,6 +50,7 @@ class Send: # the events go to its parent connection. If the connection # doesn't have a handler, the events go to the reactor. snd = ssn.sender("sender") + snd.target.address = self.target conn.open() ssn.open() snd.open() @@ -69,8 +69,8 @@ class Send: class Program: - def __init__(self, hostname, content): - self.hostname = hostname + def __init__(self, url, content): + self.url = url self.content = content def on_reactor_init(self, event): @@ -80,11 +80,13 @@ class Program: # for this connection will go to the Send object instead of # going to the reactor. If you were to omit the Send object, # all the events would go to the reactor. - event.reactor.connection(Send(self.hostname, Message(self.content))) + event.reactor.connection_to_host(self.url.host, self.url.port, + Send(Message(self.content), + self.url.path)) args = sys.argv[1:] -hostname = args.pop() if args else "localhost" +url = Url(args.pop() if args else "localhost:5672/examples") content = args.pop() if args else "Hello World!" -r = Reactor(Program(hostname, content)) +r = Reactor(Program(url, content)) r.run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/examples/python/reactor/tornado-send.py ---------------------------------------------------------------------- diff --git a/examples/python/reactor/tornado-send.py b/examples/python/reactor/tornado-send.py index 54b8618..c69876a 100755 --- a/examples/python/reactor/tornado-send.py +++ b/examples/python/reactor/tornado-send.py @@ -20,21 +20,20 @@ import sys, tornado.ioloop from tornado_app import TornadoApp -from proton import Message +from proton import Message, Url from proton.handlers import CHandshaker class Send: - def __init__(self, host, message): - self.host = host + def __init__(self, message, target): self.message = message + self.target = target if target is not None else "examples" # Use the handlers property to add some default handshaking # behaviour. self.handlers = [CHandshaker()] def on_connection_init(self, event): conn = event.connection - conn.hostname = self.host # Every session or link could have their own handler(s) if we # wanted simply by setting the "handler" slot on the @@ -46,6 +45,7 @@ class Send: # the events go to its parent connection. If the connection # doesn't have a handler, the events go to the reactor. snd = ssn.sender("sender") + snd.target.address = self.target conn.open() ssn.open() snd.open() @@ -61,8 +61,8 @@ class Send: class Program: - def __init__(self, hostname, content): - self.hostname = hostname + def __init__(self, url, content): + self.url = url self.content = content def on_reactor_init(self, event): @@ -72,11 +72,13 @@ class Program: # for this connection will go to the Send object instead of # going to the reactor. If you were to omit the Send object, # all the events would go to the reactor. - event.reactor.connection(Send(self.hostname, Message(self.content))) + event.reactor.connection_to_host(self.url.host, self.url.port, + Send(Message(self.content), + self.url.path)) args = sys.argv[1:] -hostname = args.pop() if args else "localhost" +url = Url(args.pop() if args else "localhost:5672/examples") content = args.pop() if args else "Hello World!" -TornadoApp(Program(hostname, content)) +TornadoApp(Program(url, content)) tornado.ioloop.IOLoop.instance().start() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index 5ffede8..d7db20b 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -2496,7 +2496,14 @@ class Connection(Wrapper, Endpoint): def _set_hostname(self, name): return pn_connection_set_hostname(self._impl, unicode2utf8(name)) - hostname = property(_get_hostname, _set_hostname) + hostname = property(_get_hostname, _set_hostname, + doc=""" +Set the name of the host (either fully qualified or relative) to which this +connection is connecting to. This information may be used by the remote +peer to determine the correct back-end service to connect the client to. +This value will be sent in the Open performative, and will be used by SSL +and SASL layers to identify the peer. +""") def _get_user(self): return utf82unicode(pn_connection_get_user(self._impl)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/bindings/python/proton/reactor.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py index cda6248..df1e039 100644 --- a/proton-c/bindings/python/proton/reactor.py +++ b/proton-c/bindings/python/proton/reactor.py @@ -178,11 +178,42 @@ class Reactor(Wrapper): raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port)) def connection(self, handler=None): + """Deprecated: use connection_to_host() instead + """ impl = _chandler(handler, self.on_error) result = Connection.wrap(pn_reactor_connection(self._impl, impl)) - pn_decref(impl) + if impl: pn_decref(impl) return result + def connection_to_host(self, host, port, handler=None): + """Create an outgoing Connection that will be managed by the reactor. + The reator's pn_iohandler will create a socket connection to the host + once the connection is opened. + """ + conn = self.connection(handler) + self.set_connection_host(conn, host, port) + return conn + + def set_connection_host(self, connection, host, port): + """Change the address used by the connection. The address is + used by the reactor's iohandler to create an outgoing socket + connection. This must be set prior to opening the connection. + """ + pn_reactor_set_connection_host(self._impl, + connection._impl, + unicode2utf8(str(host)), + unicode2utf8(str(port))) + + def get_connection_address(self, connection): + """This may be used to retrieve the host address used by the reactor to + establish the outgoing socket connection. + @return: string containing the address in URL format or None if no + address assigned. Use the proton.Url class to create a Url object from + the returned value. + """ + _url = pn_reactor_get_connection_address(self._impl, connection._impl) + return utf82unicode(_url) + def selectable(self, handler=None): impl = _chandler(handler, self.on_error) result = Selectable.wrap(pn_reactor_selectable(self._impl)) @@ -503,11 +534,11 @@ class Connector(Handler): self.user = None self.password = None - def _connect(self, connection): + def _connect(self, connection, reactor): + assert(reactor is not None) url = self.address.next() - # IoHandler uses the hostname to determine where to try to connect to - connection.hostname = "%s:%s" % (url.host, url.port) - logging.info("connecting to %s..." % connection.hostname) + reactor.set_connection_host(connection, url.host, str(url.port)) + logging.info("connecting to %s..." % url) transport = Transport() if self.sasl_enabled: @@ -533,7 +564,7 @@ class Connector(Handler): self.ssl.peer_hostname = url.host def on_connection_local_open(self, event): - self._connect(event.connection) + self._connect(event.connection, event.reactor) def on_connection_remote_open(self, event): logging.info("connected to %s" % event.connection.hostname) @@ -551,7 +582,7 @@ class Connector(Handler): delay = self.reconnect.next() if delay == 0: logging.info("Disconnected, reconnecting...") - self._connect(self.connection) + self._connect(self.connection, event.reactor) else: logging.info("Disconnected will try to reconnect after %s seconds" % delay) event.reactor.schedule(delay, self) @@ -560,7 +591,7 @@ class Connector(Handler): self.connection = None def on_timer_task(self, event): - self._connect(self.connection) + self._connect(self.connection, event.reactor) def on_connection_remote_close(self, event): self.connection = None http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/include/proton/connection.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h index f8a688c..da20f94 100644 --- a/proton-c/include/proton/connection.h +++ b/proton-c/include/proton/connection.h @@ -326,10 +326,17 @@ PN_EXTERN const char *pn_connection_get_user(pn_connection_t *connection); PN_EXTERN const char *pn_connection_get_hostname(pn_connection_t *connection); /** - * Set the value of the AMQP Hostname used by a connection object. + * Set the name of the host (either fully qualified or relative) to which this + * connection is connecting to. This information may be used by the remote + * peer to determine the correct back-end service to connect the client to. + * This value will be sent in the Open performative, and will be used by SSL + * and SASL layers to identify the peer. + * + * @note Note that it is illegal to set the hostname to a numeric IP address or + * include a port number. * * @param[in] connection the connection object - * @param[in] hostname the hostname + * @param[in] hostname the RFC1035 compliant host name */ PN_EXTERN void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/include/proton/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h index e91b169..be642a9 100644 --- a/proton-c/include/proton/reactor.h +++ b/proton-c/include/proton/reactor.h @@ -74,7 +74,77 @@ PN_EXTERN pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor); PN_EXTERN void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable); PN_EXTERN pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, pn_handler_t *handler); -PN_EXTERN pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler); + + +/** + * Create an outgoing connection that will be managed by the reactor. + * + * The reator's pn_iohandler will create a socket connection to the host + * once the connection is opened. + * + * @param[in] reactor the reactor that will own the connection. + * @param[in] host the address of the remote host. e.g. "localhost" + * @param[in] port the port to connect to. e.g. "5672" + * @param[in] handler the handler that will process all events generated by + * this connection. + * @return a connection object + */ +PN_EXTERN pn_connection_t *pn_reactor_connection_to_host(pn_reactor_t *reactor, + const char *host, + const char *port, + pn_handler_t *handler); + +/** + * Create an outgoing connection that will be managed by the reactor. + * + * The host address for the connection must be set via + * ::pn_reactor_set_connection_host() prior to opening the connection. + * Typically this can be done by the handler when processing the + * ::PN_CONNECTION_INIT event. + * + * @param[in] reactor the reactor that will own the connection. + * @param[in] handler the handler that will process all events generated by + * this connection. + * @return a connection object + * @deprecated Use ::pn_reactor_connection_to_host() instead. + */ +PN_EXTERN pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, + pn_handler_t *handler); + +/** + * Change the host address used by the connection. + * + * The address is used by the reactor's iohandler to create an outgoing socket + * connection. This must be set prior to opening the connection. + * + * @param[in] reactor the reactor that owns the connection. + * @param[in] connection the connection created by the reactor. + * @param[in] host the network address or DNS name of the host to connect to. + * @param[in] port the network port to use. Optional - default is "5672" + */ +PN_EXTERN void pn_reactor_set_connection_host(pn_reactor_t *reactor, + pn_connection_t *connection, + const char *host, + const char *port); +/** + * Retrieve the host address assigned to a reactor connection. + * + * This may be used to retrieve the host address used by the reactor to + * establish the outgoing socket connection. + * + * The pointer returned by this operation is valid until either the address is + * changed via ::pn_reactor_set_connection_host() or the connection object + * is freed. + * + * @param[in] reactor the reactor that owns the connection. + * @param[in] connection the connection created by ::pn_reactor_connection() + * @return a C string containing the address in URL format or NULL if no + * address assigned. ::pn_url_parse() may be used to create a Proton pn_url_t + * instance from the returned value. + */ +PN_EXTERN const char *pn_reactor_get_connection_address(pn_reactor_t *reactor, + pn_connection_t *connection); + PN_EXTERN int pn_reactor_wakeup(pn_reactor_t *reactor); PN_EXTERN void pn_reactor_start(pn_reactor_t *reactor); PN_EXTERN bool pn_reactor_quiesced(pn_reactor_t *reactor); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/src/reactor/connection.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c index 4a57bfd..5cc7099 100644 --- a/proton-c/src/reactor/connection.c +++ b/proton-c/src/reactor/connection.c @@ -24,6 +24,7 @@ #include <proton/sasl.h> #include <proton/ssl.h> #include <proton/transport.h> +#include <proton/url.h> #include <assert.h> #include <stdio.h> #include <string.h> @@ -32,6 +33,7 @@ // XXX: overloaded for both directions PN_HANDLE(PN_TRANCTX) +PN_HANDLE(PNI_CONN_URL) static pn_transport_t *pni_transport(pn_selectable_t *sel) { pn_record_t *record = pn_selectable_attachments(sel); @@ -110,20 +112,55 @@ void pni_handle_bound(pn_reactor_t *reactor, pn_event_t *event) { assert(event); pn_connection_t *conn = pn_event_connection(event); - const char *hostname = pn_connection_get_hostname(conn); - if (!hostname) { - return; - } - pn_string_t *str = pn_string(hostname); - char *host = pn_string_buffer(str); + pn_record_t *record = pn_connection_attachments(conn); + pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_URL); + const char *host = NULL; const char *port = "5672"; - char *colon = strrchr(host, ':'); - if (colon) { - port = colon + 1; - colon[0] = '\0'; + pn_string_t *str = NULL; + + if (url) { + host = pn_url_get_host(url); + const char *uport = pn_url_get_port(url); + if (uport) { + port = uport; + } else { + const char *scheme = pn_url_get_scheme(url); + if (scheme && strcmp(scheme, "amqps") == 0) { + port = "5671"; + } + } + if (!pn_connection_get_user(conn)) { + // user did not manually set auth info + const char *user = pn_url_get_username(url); + if (user) pn_connection_set_user(conn, user); + const char *passwd = pn_url_get_password(url); + if (passwd) pn_connection_set_password(conn, passwd); + } + } else { + // for backward compatibility, see if the connection's hostname can be + // used for the remote address. See JIRA PROTON-1133 + const char *hostname = pn_connection_get_hostname(conn); + if (hostname) { + str = pn_string(hostname); + host = pn_string_buffer(str); + // see if a port has been included in the hostname. This is not + // allowed by the spec, but the old reactor interface allowed it. + char *colon = strrchr(host, ':'); + if (colon) { + port = colon + 1; + colon[0] = '\0'; + } + } } - pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port); + + // host will be NULL if this connection was created via the acceptor, which + // creates its own transport/socket when created. + if (!host) { + return; + } + pn_transport_t *transport = pn_event_transport(event); + pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port); // invalid sockets are ignored by poll, so we need to do this manualy if (sock == PN_INVALID_SOCKET) { pn_condition_t *cond = pn_transport_condition(transport); @@ -132,7 +169,7 @@ void pni_handle_bound(pn_reactor_t *reactor, pn_event_t *event) { pn_transport_close_tail(transport); pn_transport_close_head(transport); } - pn_free(str); + if (str) pn_free(str); pn_reactor_selectable_transport(reactor, sock, transport); } @@ -264,3 +301,42 @@ pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *hand pn_decref(connection); return connection; } + +pn_connection_t *pn_reactor_connection_to_host(pn_reactor_t *reactor, + const char *host, + const char *port, + pn_handler_t *handler) { + pn_connection_t *connection = pn_reactor_connection(reactor, handler); + pn_reactor_set_connection_host(reactor, connection, host, port); + return connection; +} + + +void pn_reactor_set_connection_host(pn_reactor_t *reactor, + pn_connection_t *connection, + const char *host, + const char *port) +{ + (void)reactor; // ignored + pn_url_t *url = pn_url(); + pn_url_set_host(url, host); + pn_url_set_port(url, port); + pn_record_t *record = pn_connection_attachments(connection); + if (!pn_record_has(record, PNI_CONN_URL)) { + pn_record_def(record, PNI_CONN_URL, PN_OBJECT); + } + pn_record_set(record, PNI_CONN_URL, url); + pn_decref(url); +} + + +const char *pn_reactor_get_connection_address(pn_reactor_t *reactor, + pn_connection_t *connection) +{ + (void)reactor; // ignored + if (!connection) return NULL; + pn_record_t *record = pn_connection_attachments(connection); + pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_URL); + if (!url) return NULL; + return pn_url_str(url); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-c/src/tests/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c index 1e706e2..9564569 100644 --- a/proton-c/src/tests/reactor.c +++ b/proton-c/src/tests/reactor.c @@ -26,7 +26,9 @@ #include <proton/session.h> #include <proton/link.h> #include <proton/delivery.h> +#include <proton/url.h> #include <stdlib.h> +#include <string.h> #define assert(E) ((E) ? 0 : (abort(), 0)) @@ -193,6 +195,11 @@ static void test_reactor_connection(void) { pn_handler_t *tch = test_handler(reactor, cevents); pn_connection_t *connection = pn_reactor_connection(reactor, tch); assert(connection); + pn_reactor_set_connection_host(reactor, connection, "127.0.0.1", "5672"); + pn_url_t *url = pn_url_parse(pn_reactor_get_connection_address(reactor, connection)); + assert(strcmp(pn_url_get_host(url), "127.0.0.1") == 0); + assert(strcmp(pn_url_get_port(url), "5672") == 0); + pn_decref(url); pn_handler_t *root = pn_reactor_get_handler(reactor); pn_list_t *revents = pn_list(PN_VOID, 0); pn_handler_add(root, test_handler(reactor, revents)); @@ -290,7 +297,7 @@ static void client_dispatch(pn_handler_t *handler, pn_event_t *event, pn_event_t pn_connection_t *conn = pn_event_connection(event); switch (pn_event_type(event)) { case PN_CONNECTION_INIT: - pn_connection_set_hostname(conn, "127.0.0.1:5678"); + pn_connection_set_hostname(conn, "some.org"); pn_connection_open(conn); break; case PN_CONNECTION_REMOTE_OPEN: @@ -315,7 +322,15 @@ static void test_reactor_connect(void) { pn_handler_t *ch = pn_handler_new(client_dispatch, sizeof(client_t), NULL); client_t *cli = cmem(ch); cli->events = pn_list(PN_VOID, 0); - pn_reactor_connection(reactor, ch); + pn_connection_t *conn = pn_reactor_connection_to_host(reactor, + "127.0.0.1", + "5678", + ch); + assert(conn); + pn_url_t *url = pn_url_parse(pn_reactor_get_connection_address(reactor, conn)); + assert(strcmp(pn_url_get_host(url), "127.0.0.1") == 0); + assert(strcmp(pn_url_get_port(url), "5678") == 0); + pn_decref(url); pn_reactor_run(reactor); expect(srv->events, PN_CONNECTION_INIT, PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN, @@ -376,7 +391,6 @@ void source_dispatch(pn_handler_t *handler, pn_event_t *event, pn_event_type_t t switch (type) { case PN_CONNECTION_INIT: { - pn_connection_set_hostname(conn, "127.0.0.1:5678"); pn_session_t *ssn = pn_session(conn); pn_link_t *snd = pn_sender(ssn, "sender"); pn_connection_open(conn); @@ -427,7 +441,18 @@ static void test_reactor_transfer(int count, int window) { pn_handler_t *ch = pn_handler_new(source_dispatch, sizeof(source_t), NULL); source_t *src = source(ch); src->remaining = count; - pn_reactor_connection(reactor, ch); + pn_connection_t *conn = NULL; + // Using the connection's hostname to set the connection address is + // deprecated. Once support is dropped the conditional code can be removed: + #if 0 + conn = pn_reactor_connection(reactor, ch); + assert(conn); + pn_reactor_connection_set_address(reactor, conn, "127.0.0.1", "5678"); + #else + // This is deprecated: + conn = pn_reactor_connection(reactor, ch); + pn_connection_set_hostname(conn, "127.0.0.1:5678"); + #endif pn_reactor_run(reactor); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java index feff80b..fb4de11 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java @@ -92,6 +92,17 @@ public interface Connection extends Endpoint, ReactorChild public String getContainer(); + /** + * Set the name of the host (either fully qualified or relative) to which + * this connection is connecting to. This information may be used by the + * remote peer to determine the correct back-end service to connect the + * client to. This value will be sent in the Open performative. + * + * <b>Note that it is illegal to set the hostname to a numeric IP + * address or include a port number.</b> + * + * @param hostname the RFC1035 compliant host name. + */ public void setHostname(String hostname); public String getHostname(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java index 9d67d49..a3307d2 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java @@ -130,15 +130,16 @@ public interface Reactor { void setHandler(Handler handler); /** - * @return a set containing the child objects associated wit this reactor. - * This will contain any active instances of: {@link Task} - created - * using the {@link #schedule(int, Handler)} method, + * @return a set containing the child objects associated with this reactor. + * This will contain any active instances of: {@link Task} - + * created using the {@link #schedule(int, Handler)} method, * {@link Connection} - created using the - * {@link #connection(Handler)} method, {@link Acceptor} - - * created using the {@link #acceptor(String, int)} method. + * {@link #connectionToHost(String, int, Handler)} method, + * {@link Acceptor} - created using the + * {@link #acceptor(String, int)} method, * {@link #acceptor(String, int, Handler)} method, or - * {@link Selectable} - created using the {@link #selectable()} - * method. + * {@link Selectable} - created using the + * {@link #selectable()} method. */ Set<ReactorChild> children(); @@ -245,12 +246,52 @@ public interface Reactor { * connection. Typically the host and port to connect to * would be supplied to the connection object inside the * logic which handles the {@link Type#CONNECTION_INIT} - * event. + * event via + * {@link #setConnectionHost(Connection, String, int)} * @return the newly created connection object. + * @deprecated Use {@link #connectionToHost(String, int, Handler)} instead. */ + @Deprecated Connection connection(Handler handler); /** + * Creates a new out-bound connection to the given host and port. + * <p> + * This method will cause Reactor to set up a network connection to the + * host and create a Connection for it. + * @param host the host to connect to (e.g. "localhost") + * @param port the port used for the connection. + * @param handler a handler that is notified when events occur for the + * connection. + * @return the newly created connection object. + */ + Connection connectionToHost(String host, int port, Handler handler); + + /** + * Set the host address used by the connection + * <p> + * This method will set/change the host address used by the Reactor to + * create an outbound network connection for the given Connection + * @param c the Connection to assign the address to + * @param host the address of the host to connect to (e.g. "localhost") + * @param port the port to use for the connection. + */ + void setConnectionHost(Connection c, String host, int port); + + /** + * Get the address used by the connection + * <p> + * This method will retrieve the Connection's address as set by + * {@link #setConnectionHost(Connection, String, int)}. + * @param c the Connection + * @return a string containing the address in the following format: + * <pre> + * host[:port] + * </pre> + */ + String getConnectionAddress(Connection c); + + /** * Creates a new acceptor. This is equivalent to calling: * <pre> * acceptor(host, port, null); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index 40eddac..5a32824 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -42,6 +42,7 @@ import org.apache.qpid.proton.reactor.Reactor; import org.apache.qpid.proton.reactor.Selectable; import org.apache.qpid.proton.reactor.Selectable.Callback; import org.apache.qpid.proton.reactor.Selector; +import org.apache.qpid.proton.messenger.impl.Address; public class IOHandler extends BaseHandler { @@ -87,20 +88,34 @@ public class IOHandler extends BaseHandler { // pni_handle_bound(...) from connection.c private void handleBound(Reactor reactor, Event event) { Connection connection = event.getConnection(); + String url = reactor.getConnectionAddress(connection); String hostname = connection.getHostname(); - if (hostname == null || hostname.equals("")) { - return; - } - - int colonIndex = hostname.indexOf(':'); int port = 5672; - if (colonIndex >= 0) { + + if (url != null) { + Address address = new Address(url); + hostname = address.getHost(); try { - port = Integer.parseInt(hostname.substring(colonIndex+1)); + port = Integer.parseInt(address.getImpliedPort()); } catch(NumberFormatException nfe) { - throw new IllegalArgumentException("Not a valid host: " + hostname, nfe); + throw new IllegalArgumentException("Not a valid host: " + url, nfe); } - hostname = hostname.substring(0, colonIndex); + } else if (hostname != null && !hostname.equals("")) { + // Backward compatibility with old code that illegally overloaded + // the connection's hostname + int colonIndex = hostname.indexOf(':'); + if (colonIndex >= 0) { + try { + port = Integer.parseInt(hostname.substring(colonIndex+1)); + } catch(NumberFormatException nfe) { + throw new IllegalArgumentException("Not a valid host: " + hostname, nfe); + } + hostname = hostname.substring(0, colonIndex); + } + } else { + // The transport connection already exists (like Acceptor), so no + // socket needed. + return; } Transport transport = event.getConnection().getTransport(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index 0eb126a..d13cfbe 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -50,6 +50,7 @@ import org.apache.qpid.proton.reactor.Selectable; import org.apache.qpid.proton.reactor.Selectable.Callback; import org.apache.qpid.proton.reactor.Selector; import org.apache.qpid.proton.reactor.Task; +import org.apache.qpid.proton.messenger.impl.Address; public class ReactorImpl implements Reactor, Extendable { public static final ExtendableAccessor<Event, Handler> ROOT = new ExtendableAccessor<>(Handler.class); @@ -427,6 +428,41 @@ public class ReactorImpl implements Reactor, Extendable { } @Override + public Connection connectionToHost(String host, int port, Handler handler) { + Connection connection = connection(handler); + setConnectionHost(connection, host, port); + return connection; + } + + static final String CONNECTION_ADDRESS_KEY = "pn_reactor_address"; + + @Override + public String getConnectionAddress(Connection connection) { + Record r = connection.attachments(); + Address addr = r.get(CONNECTION_ADDRESS_KEY, Address.class); + if (addr != null) { + StringBuilder sb = new StringBuilder(addr.getHost()); + if (addr.getPort() != null) + sb.append(":" + addr.getPort()); + return sb.toString(); + } + return null; + } + + @Override + public void setConnectionHost(Connection connection, + String host, int port) { + Record r = connection.attachments(); + Address addr = new Address(); + addr.setHost(host); + if (port == 0) { + port = 5672; + } + addr.setPort(Integer.toString(port)); + r.set(CONNECTION_ADDRESS_KEY, Address.class, addr); + } + + @Override public Acceptor acceptor(String host, int port) throws IOException { return this.acceptor(host, port, null); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java index 10c591a..4e92cd9 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java @@ -165,6 +165,9 @@ public class ReactorTest { Connection connection = reactor.connection(connectionHandler); assertNotNull(connection); assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection)); + reactor.setConnectionHost(connection, "127.0.0.1", 5672); + assertEquals("connection address configuration failed", + reactor.getConnectionAddress(connection), "127.0.0.1:5672"); TestHandler reactorHandler = new TestHandler(); reactor.getHandler().add(reactorHandler); reactor.run(); @@ -240,7 +243,9 @@ public class ReactorTest { @Override public void onConnectionInit(Event event) { super.onConnectionInit(event); - event.getConnection().setHostname("127.0.0.1:" + listeningPort); + event.getReactor().setConnectionHost(event.getConnection(), + "127.0.0.1", + listeningPort); event.getConnection().open(); } @Override @@ -300,17 +305,14 @@ public class ReactorTest { private static class SourceHandler extends BaseHandler { private int remaining; - private final int port; - protected SourceHandler(int count, int port) { + protected SourceHandler(int count) { remaining = count; - this.port = port; } @Override public void onConnectionInit(Event event) { Connection conn = event.getConnection(); - conn.setHostname("127.0.0.1:" + port); Session ssn = conn.session(); Sender snd = ssn.sender("sender"); conn.open(); @@ -352,9 +354,9 @@ public class ReactorTest { SinkHandler snk = new SinkHandler(); sh.add(snk); - SourceHandler src = new SourceHandler(count, ((AcceptorImpl)acceptor).getPortNumber()); - reactor.connection(src); - + SourceHandler src = new SourceHandler(count); + reactor.connectionToHost("127.0.0.1", ((AcceptorImpl)acceptor).getPortNumber(), + src); reactor.run(); reactor.free(); assertEquals("Did not receive the expected number of messages", count, snk.received); @@ -575,7 +577,6 @@ public class ReactorTest { public void onConnectionInit(Event event) { super.onConnectionInit(event); Connection connection = event.getConnection(); - connection.setHostname("127.0.0.1:" + serverSocket.getLocalPort()); connection.open(); try { serverSocket.close(); @@ -587,7 +588,7 @@ public class ReactorTest { } } TestHandler connectionHandler = new ConnectionHandler(); - reactor.connection(connectionHandler); + reactor.connectionToHost("127.0.0.1", serverSocket.getLocalPort(), connectionHandler); reactor.run(); reactor.free(); serverSocket.close(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/tests/java/org/apache/qpid/proton/ProtonJInterop.java ---------------------------------------------------------------------- diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java index 31306ef..58a9c31 100644 --- a/tests/java/org/apache/qpid/proton/ProtonJInterop.java +++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java @@ -44,13 +44,11 @@ public class ProtonJInterop { private static class SendHandler extends BaseHandler { - private final String hostname; private int numMsgs; private int count = 0; private boolean result = false; - private SendHandler(String hostname, int numMsgs) { - this.hostname = hostname; + private SendHandler(int numMsgs) { this.numMsgs = numMsgs; add(new Handshaker()); } @@ -58,7 +56,6 @@ public class ProtonJInterop { @Override public void onConnectionInit(Event event) { Connection conn = event.getConnection(); - conn.setHostname(hostname); Session ssn = conn.session(); Sender snd = ssn.sender("sender"); conn.open(); @@ -111,14 +108,19 @@ public class ProtonJInterop { private static class Send extends BaseHandler { private final SendHandler sendHandler; + private final String host; + private final int port; - private Send(String hostname, int numMsgs) { - sendHandler = new SendHandler(hostname, numMsgs); + private Send(String host, int port, int numMsgs) { + this.host = host; + this.port = port; + sendHandler = new SendHandler(numMsgs); } @Override public void onReactorInit(Event event) { - event.getReactor().connection(sendHandler); + Reactor r = event.getReactor(); + r.connectionToHost(host, port, sendHandler); } public boolean getResult() { @@ -185,7 +187,7 @@ public class ProtonJInterop { boolean result = false; if ("send".equalsIgnoreCase(args[0])) { - Send send = new Send("localhost:" + port, numMsgs); + Send send = new Send("localhost", port, numMsgs); Reactor r = Proton.reactor(send); r.run(); result = send.getResult(); @@ -200,4 +202,4 @@ public class ProtonJInterop { System.exit(1); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/619c5c7f/tests/java/shim/creactor.py ---------------------------------------------------------------------- diff --git a/tests/java/shim/creactor.py b/tests/java/shim/creactor.py index 95fd020..b61c1df 100644 --- a/tests/java/shim/creactor.py +++ b/tests/java/shim/creactor.py @@ -56,6 +56,13 @@ def pn_reactor_selectable(r): return r.selectable() def pn_reactor_connection(r, h): return wrap(r.connection(h), pn_connection_wrapper) +def pn_reactor_connection_to_host(r, host, port, h): + return wrap(r.connectionToHost(host, int(port), h), + pn_connection_wrapper) +def pn_reactor_get_connection_address(r, c): + return r.getConnectionAddress(c.impl) +def pn_reactor_set_connection_host(r, c, h, p): + r.setConnectionHost(c.impl, h, int(p)) def pn_reactor_acceptor(r, host, port, handler): return r.acceptor(host, int(port), handler) def pn_reactor_mark(r): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
