Author: rhs Date: Thu Sep 18 20:31:08 2014 New Revision: 1626080 URL: http://svn.apache.org/r1626080 Log: PROTON-669: applied patch from dominic for fail fast checking of messenger routes
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h qpid/proton/trunk/proton-c/src/messenger/messenger.c qpid/proton/trunk/proton-c/src/messenger/transform.c qpid/proton/trunk/proton-c/src/messenger/transform.h Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1626080&r1=1626079&r2=1626080&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/include/proton/messenger.h (original) +++ qpid/proton/trunk/proton-c/include/proton/messenger.h Thu Sep 18 20:31:08 2014 @@ -900,6 +900,28 @@ PN_EXTERN pn_timestamp_t pn_messenger_de * @} */ +#define PN_FLAGS_CHECK_ROUTES \ + (0x1) /** Messenger flag to indicate that a call \ + to pn_messenger_start should check that \ + any defined routes are valid */ + +/** Sets control flags to enable additional function for the Messenger. + * + * @param[in] messenger the messenger + * @param[in] flags 0 or PN_FLAGS_CHECK_ROUTES + * + * @return an error code of zero if there is no error + */ +PN_EXTERN int pn_messenger_set_flags(pn_messenger_t *messenger, + const int flags); + +/** Gets the flags for a Messenger. + * + * @param[in] messenger the messenger + * @return The flags set for the messenger + */ +PN_EXTERN int pn_messenger_get_flags(pn_messenger_t *messenger); + #ifdef __cplusplus } #endif Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1626080&r1=1626079&r2=1626080&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original) +++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Sep 18 20:31:08 2014 @@ -100,6 +100,7 @@ struct pn_messenger_t { int receivers; // # receiver links int draining; // # links in drain state int connection_error; + int flags; bool blocking; bool passive; bool interrupted; @@ -372,6 +373,10 @@ static pn_listener_ctx_t *pn_listener_ct pn_socket_t socket = pn_listen(messenger->io, host, port ? port : default_port(scheme)); if (socket == PN_INVALID_SOCKET) { pn_error_copy(messenger->error, pn_io_error(messenger->io)); + pn_error_format(messenger->error, PN_ERR, "CONNECTION ERROR (%s:%s): %s\n", + messenger->address.host, messenger->address.port, + pn_error_text(messenger->error)); + return NULL; } @@ -639,6 +644,7 @@ pn_messenger_t *pn_messenger(const char m->rewritten = pn_string(NULL); m->domain = pn_string(NULL); m->connection_error = 0; + m->flags = 0; } return m; @@ -1430,11 +1436,85 @@ int pn_messenger_sync(pn_messenger_t *me } } +static void pni_parse(pn_address_t *address); +pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, + const char *address, char **name); +int pn_messenger_work(pn_messenger_t *messenger, int timeout); + int pn_messenger_start(pn_messenger_t *messenger) { if (!messenger) return PN_ARG_ERR; - // right now this is a noop - return 0; + + int error = 0; + + // When checking of routes is required we attempt to resolve each route + // with a substitution that has a defined scheme, address and port. If + // any of theses routes is invalid an appropriate error code will be + // returned. Currently no attempt is made to check the name part of the + // address, as the intent here is to fail fast if the addressed host + // is invalid or unavailable. + if (messenger->flags | PN_FLAGS_CHECK_ROUTES) { + pn_list_t *substitutions = pn_list(PN_WEAKREF, 0); + pn_transform_get_substitutions(messenger->routes, substitutions); + for (size_t i = 0; i < pn_list_size(substitutions) && error == 0; i++) { + pn_string_t *substitution = (pn_string_t *)pn_list_get(substitutions, i); + if (substitution) { + pn_address_t addr; + addr.text = pn_string(NULL); + error = pn_string_copy(addr.text, substitution); + if (!error) { + pni_parse(&addr); + if (addr.scheme && strlen(addr.scheme) > 0 && + !strstr(addr.scheme, "$") && addr.host && strlen(addr.host) > 0 && + !strstr(addr.host, "$") && addr.port && strlen(addr.port) > 0 && + !strstr(addr.port, "$")) { + pn_string_t *check_addr = pn_string(NULL); + // ipv6 hosts need to be wrapped in [] within a URI + if (strstr(addr.host, ":")) { + pn_string_format(check_addr, "%s://[%s]:%s/", addr.scheme, + addr.host, addr.port); + } else { + pn_string_format(check_addr, "%s://%s:%s/", addr.scheme, + addr.host, addr.port); + } + char *name = NULL; + pn_connection_t *connection = pn_messenger_resolve( + messenger, pn_string_get(check_addr), &name); + pn_free(check_addr); + if (!connection) { + if (pn_error_code(messenger->error) == 0) + pn_error_copy(messenger->error, pn_io_error(messenger->io)); + pn_error_format(messenger->error, PN_ERR, + "CONNECTION ERROR (%s:%s): %s\n", + messenger->address.host, messenger->address.port, + pn_error_text(messenger->error)); + error = pn_error_code(messenger->error); + } else { + // Send and receive outstanding messages until connection + // completes or an error occurs + int work = pn_messenger_work(messenger, -1); + pn_connection_ctx_t *cctx = + (pn_connection_ctx_t *)pn_connection_get_context(connection); + while ((work > 0 || + (pn_connection_state(connection) & PN_REMOTE_UNINIT) || + pni_connection_pending(cctx->selectable) != (ssize_t)0) && + pn_error_code(messenger->error) == 0) + work = pn_messenger_work(messenger, 0); + if (work < 0 && work != PN_TIMEOUT) { + error = work; + } else { + error = pn_error_code(messenger->error); + } + } + } + pn_free(addr.text); + } + } + } + pn_free(substitutions); + } + + return error; } bool pn_messenger_stopped(pn_messenger_t *messenger) @@ -2154,3 +2234,18 @@ int pn_messenger_rewrite(pn_messenger_t pn_transform_rule(messenger->rewrites, pattern, address); return 0; } + +PN_EXTERN int pn_messenger_set_flags(pn_messenger_t *messenger, const int flags) +{ + if (!messenger) + return PN_ARG_ERR; + if (flags != 0 && (flags ^ PN_FLAGS_CHECK_ROUTES) != 0) + return PN_ARG_ERR; + messenger->flags = flags; + return 0; +} + +PN_EXTERN int pn_messenger_get_flags(pn_messenger_t *messenger) +{ + return messenger ? messenger->flags : 0; +} Modified: qpid/proton/trunk/proton-c/src/messenger/transform.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/transform.c?rev=1626080&r1=1626079&r2=1626080&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/messenger/transform.c (original) +++ qpid/proton/trunk/proton-c/src/messenger/transform.c Thu Sep 18 20:31:08 2014 @@ -19,7 +19,6 @@ * */ -#include <proton/object.h> #include <string.h> #include <assert.h> #include <ctype.h> @@ -241,3 +240,15 @@ bool pn_transform_matched(pn_transform_t { return transform->matched; } + +int pn_transform_get_substitutions(pn_transform_t *transform, + pn_list_t *substitutions) +{ + int size = pn_list_size(transform->rules); + for (size_t i = 0; i < (size_t)size; i++) { + pn_rule_t *rule = (pn_rule_t *)pn_list_get(transform->rules, i); + pn_list_add(substitutions, rule->substitution); + } + + return size; +} Modified: qpid/proton/trunk/proton-c/src/messenger/transform.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/transform.h?rev=1626080&r1=1626079&r2=1626080&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/messenger/transform.h (original) +++ qpid/proton/trunk/proton-c/src/messenger/transform.h Thu Sep 18 20:31:08 2014 @@ -22,6 +22,7 @@ * */ +#include <proton/object.h> #include <proton/buffer.h> typedef struct pn_transform_t pn_transform_t; @@ -32,6 +33,7 @@ void pn_transform_rule(pn_transform_t *t int pn_transform_apply(pn_transform_t *transform, const char *src, pn_string_t *dest); bool pn_transform_matched(pn_transform_t *transform); - +int pn_transform_get_substitutions(pn_transform_t *transform, + pn_list_t *substitutions); #endif /* transform.h */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org