jiridanek commented on a change in pull request #744:
URL: https://github.com/apache/qpid-dispatch/pull/744#discussion_r432864894
##########
File path: src/server.c
##########
@@ -17,1657 +17,609 @@
* under the License.
*/
-#include "python_private.h" // must be first!
-#include "dispatch_private.h"
-#include <qpid/dispatch/python_embedded.h>
-
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/threading.h>
-#include <qpid/dispatch/log.h>
-#include <qpid/dispatch/amqp.h>
-#include <qpid/dispatch/server.h>
-#include <qpid/dispatch/failoverlist.h>
-#include <qpid/dispatch/alloc.h>
-#include <qpid/dispatch/platform.h>
+// This must be first to work around a Python bug
+//
+// clang-format off
+#include "python_private.h"
+// clang-format on
+#include <errno.h>
+#include <inttypes.h>
+#include <proton/condition.h>
#include <proton/event.h>
#include <proton/listener.h>
#include <proton/netaddr.h>
#include <proton/proactor.h>
#include <proton/sasl.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/amqp.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/failoverlist.h>
+#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/platform.h>
+#include <qpid/dispatch/python_embedded.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/threading.h>
+#include <stdio.h>
+#include <string.h>
-
+#include "config.h"
+#include "connection.h"
+#include "connector.h"
+#include "dispatch_private.h"
#include "entity.h"
#include "entity_cache.h"
-#include "dispatch_private.h"
+#include "listener.h"
#include "policy.h"
+#include "remote_sasl.h"
#include "server_private.h"
#include "timer_private.h"
-#include "config.h"
-#include "remote_sasl.h"
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <inttypes.h>
-
-struct qd_server_t {
- qd_dispatch_t *qd;
- const int thread_count; /* Immutable */
- const char *container_name;
- const char *sasl_config_path;
- const char *sasl_config_name;
- pn_proactor_t *proactor;
- qd_container_t *container;
- qd_log_source_t *log_source;
- qd_log_source_t *protocol_log_source; // Log source for the
PROTOCOL module
- void *start_context;
- sys_cond_t *cond;
- sys_mutex_t *lock;
- qd_connection_list_t conn_list;
- int pause_requests;
- int threads_paused;
- int pause_next_sequence;
- int pause_now_serving;
- uint64_t next_connection_id;
- void *py_displayname_obj;
- qd_http_server_t *http;
- sys_mutex_t *conn_activation_lock;
-};
#define HEARTBEAT_INTERVAL 1000
-ALLOC_DEFINE(qd_listener_t);
-ALLOC_DEFINE(qd_connector_t);
ALLOC_DEFINE(qd_deferred_call_t);
-ALLOC_DEFINE(qd_connection_t);
-
-const char *MECH_EXTERNAL = "EXTERNAL";
-
-//Allowed uidFormat fields.
-const char CERT_COUNTRY_CODE = 'c';
-const char CERT_STATE = 's';
-const char CERT_CITY_LOCALITY = 'l';
-const char CERT_ORGANIZATION_NAME = 'o';
-const char CERT_ORGANIZATION_UNIT = 'u';
-const char CERT_COMMON_NAME = 'n';
-const char CERT_FINGERPRINT_SHA1 = '1';
-const char CERT_FINGERPRINT_SHA256 = '2';
-const char CERT_FINGERPRINT_SHA512 = '5';
-char *COMPONENT_SEPARATOR = ";";
-
-static const int BACKLOG = 50; /* Listening backlog */
-
-static void setup_ssl_sasl_and_open(qd_connection_t *ctx);
-static qd_failover_item_t *qd_connector_get_conn_info(qd_connector_t *ct);
-
-/**
- * This function is set as the pn_transport->tracer and is invoked when proton
tries to write the log message to pn_transport->tracer
- */
-void transport_tracer(pn_transport_t *transport, const char *message)
-{
- qd_connection_t *ctx = (qd_connection_t*)
pn_transport_get_context(transport);
- if (ctx) {
- // The PROTOCOL module is used exclusively for logging protocol
related tracing. The protocol could be AMQP, HTTP, TCP etc.
- qd_log(ctx->server->protocol_log_source, QD_LOG_TRACE,
"[%"PRIu64"]:%s", ctx->connection_id, message);
- }
-}
-
-void connection_transport_tracer(pn_transport_t *transport, const char
*message)
-{
- qd_connection_t *ctx = (qd_connection_t*)
pn_transport_get_context(transport);
- if (ctx) {
- // Unconditionally write the log at TRACE level to the log file.
- qd_log_impl_v1(ctx->server->protocol_log_source, QD_LOG_TRACE,
__FILE__, __LINE__, "[%"PRIu64"]:%s", ctx->connection_id, message);
- }
-}
-/**
- * Save displayNameService object instance and ImportModule address
- * Called with qd_python_lock held
- */
-qd_error_t qd_register_display_name_service(qd_dispatch_t *qd, void
*displaynameservice)
-{
+// Save displayNameService object instance and ImportModule address.
+// Called with qd_python_lock held.
+//
+// XXX But it doesn't use the _lh convention?
+qd_error_t qd_register_display_name_service(qd_dispatch_t* qd, void*
displaynameservice) {
if (displaynameservice) {
qd->server->py_displayname_obj = displaynameservice;
- Py_XINCREF((PyObject *)qd->server->py_displayname_obj);
+ Py_XINCREF((PyObject*) qd->server->py_displayname_obj);
return QD_ERROR_NONE;
- }
- else {
+ } else {
return qd_error(QD_ERROR_VALUE, "displaynameservice is not set");
}
}
-
-/**
- * Returns a char pointer to a user id which is constructed from components
specified in the config->ssl_uid_format.
- * Parses through each component and builds a semi-colon delimited string
which is returned as the user id.
- */
-static const char *transport_get_user(qd_connection_t *conn, pn_transport_t
*tport)
-{
- const qd_server_config_t *config =
- conn->connector ? &conn->connector->config :
&conn->listener->config;
-
- if (config->ssl_uid_format) {
- // The ssl_uid_format length cannot be greater that 7
- assert(strlen(config->ssl_uid_format) < 8);
-
- //
- // The tokens in the uidFormat strings are delimited by comma. Load
the individual components of the uidFormat
- // into the components[] array. The maximum number of components that
are allowed are 7 namely, c, s, l, o, u, n, (1 or 2 or 5)
- //
- char components[8];
-
- //The strcpy() function copies the string pointed to by src, including
the terminating null byte ('\0'), to the buffer pointed to by dest.
- strncpy(components, config->ssl_uid_format, 7);
-
- const char *country_code = 0;
- const char *state = 0;
- const char *locality_city = 0;
- const char *organization = 0;
- const char *org_unit = 0;
- const char *common_name = 0;
- //
- // SHA1 is 20 octets (40 hex characters); SHA256 is 32 octets (64 hex
characters).
- // SHA512 is 64 octets (128 hex characters)
- //
- char fingerprint[129] = "\0";
-
- int uid_length = 0;
- int semi_colon_count = -1;
-
- int component_count = strlen(components);
-
- for (int x = 0; x < component_count ; x++) {
- // accumulate the length into uid_length on each pass so we
definitively know the number of octets to malloc.
- if (components[x] == CERT_COUNTRY_CODE) {
- country_code =
pn_ssl_get_remote_subject_subfield(pn_ssl(tport),
PN_SSL_CERT_SUBJECT_COUNTRY_NAME);
- if (country_code) {
- uid_length += strlen((const char *)country_code);
- semi_colon_count++;
- }
- }
- else if (components[x] == CERT_STATE) {
- state = pn_ssl_get_remote_subject_subfield(pn_ssl(tport),
PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE);
- if (state) {
- uid_length += strlen((const char *)state);
- semi_colon_count++;
- }
- }
- else if (components[x] == CERT_CITY_LOCALITY) {
- locality_city =
pn_ssl_get_remote_subject_subfield(pn_ssl(tport),
PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY);
- if (locality_city) {
- uid_length += strlen((const char *)locality_city);
- semi_colon_count++;
- }
- }
- else if (components[x] == CERT_ORGANIZATION_NAME) {
- organization =
pn_ssl_get_remote_subject_subfield(pn_ssl(tport),
PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME);
- if(organization) {
- uid_length += strlen((const char *)organization);
- semi_colon_count++;
- }
- }
- else if (components[x] == CERT_ORGANIZATION_UNIT) {
- org_unit = pn_ssl_get_remote_subject_subfield(pn_ssl(tport),
PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT);
- if(org_unit) {
- uid_length += strlen((const char *)org_unit);
- semi_colon_count++;
- }
- }
- else if (components[x] == CERT_COMMON_NAME) {
- common_name =
pn_ssl_get_remote_subject_subfield(pn_ssl(tport),
PN_SSL_CERT_SUBJECT_COMMON_NAME);
- if(common_name) {
- uid_length += strlen((const char *)common_name);
- semi_colon_count++;
- }
- }
- else if (components[x] == CERT_FINGERPRINT_SHA1 || components[x]
== CERT_FINGERPRINT_SHA256 || components[x] == CERT_FINGERPRINT_SHA512) {
- // Allocate the memory for message digest
- int out = 0;
-
- int fingerprint_length = 0;
- if(components[x] == CERT_FINGERPRINT_SHA1) {
- fingerprint_length = 40;
- out = pn_ssl_get_cert_fingerprint(pn_ssl(tport),
fingerprint, fingerprint_length + 1, PN_SSL_SHA1);
- }
- else if (components[x] == CERT_FINGERPRINT_SHA256) {
- fingerprint_length = 64;
- out = pn_ssl_get_cert_fingerprint(pn_ssl(tport),
fingerprint, fingerprint_length + 1, PN_SSL_SHA256);
- }
- else if (components[x] == CERT_FINGERPRINT_SHA512) {
- fingerprint_length = 128;
- out = pn_ssl_get_cert_fingerprint(pn_ssl(tport),
fingerprint, fingerprint_length + 1, PN_SSL_SHA512);
- }
-
- (void) out; // avoid 'out unused' compiler warnings if NDEBUG
undef'ed
- assert (out != PN_ERR);
-
- uid_length += fingerprint_length;
- semi_colon_count++;
- }
- else {
- // This is an unrecognized component. log a critical error
- qd_log(conn->server->log_source, QD_LOG_CRITICAL,
"Unrecognized component '%c' in uidFormat ", components[x]);
- return 0;
- }
- }
-
- if(uid_length > 0) {
- char *user_id = malloc((uid_length + semi_colon_count + 1) *
sizeof(char)); // the +1 is for the '\0' character
- //
- // We have allocated memory for user_id. We are responsible for
freeing this memory. Set conn->free_user_id
- // to true so that we know that we have to free the user_id
- //
- conn->free_user_id = true;
- memset(user_id, 0, uid_length + semi_colon_count + 1);
-
- // The components in the user id string must appear in the same
order as it appears in the component string. that is
- // why we have this loop
- for (int x=0; x < component_count ; x++) {
- if (components[x] == CERT_COUNTRY_CODE) {
- if (country_code) {
- if(*user_id != '\0')
- strcat(user_id, COMPONENT_SEPARATOR);
- strcat(user_id, (char *) country_code);
- }
- }
- else if (components[x] == CERT_STATE) {
- if (state) {
- if(*user_id != '\0')
- strcat(user_id, COMPONENT_SEPARATOR);
- strcat(user_id, (char *) state);
- }
- }
- else if (components[x] == CERT_CITY_LOCALITY) {
- if (locality_city) {
- if(*user_id != '\0')
- strcat(user_id, COMPONENT_SEPARATOR);
- strcat(user_id, (char *) locality_city);
- }
- }
- else if (components[x] == CERT_ORGANIZATION_NAME) {
- if (organization) {
- if(*user_id != '\0')
- strcat(user_id, COMPONENT_SEPARATOR);
- strcat(user_id, (char *) organization);
- }
- }
- else if (components[x] == CERT_ORGANIZATION_UNIT) {
- if (org_unit) {
- if(*user_id != '\0')
- strcat(user_id, COMPONENT_SEPARATOR);
- strcat(user_id, (char *) org_unit);
- }
- }
- else if (components[x] == CERT_COMMON_NAME) {
- if (common_name) {
- if(*user_id != '\0')
- strcat(user_id, COMPONENT_SEPARATOR);
- strcat(user_id, (char *) common_name);
- }
- }
- else if (components[x] == CERT_FINGERPRINT_SHA1 ||
components[x] == CERT_FINGERPRINT_SHA256 || components[x] ==
CERT_FINGERPRINT_SHA512) {
- if (strlen((char *) fingerprint) > 0) {
- if(*user_id != '\0')
- strcat(user_id, COMPONENT_SEPARATOR);
- strcat(user_id, (char *) fingerprint);
- }
- }
- }
- if (config->ssl_uid_name_mapping_file) {
- // Translate extracted id into display name
- qd_python_lock_state_t lock_state = qd_python_lock();
- PyObject *result = PyObject_CallMethod((PyObject
*)conn->server->py_displayname_obj, "query", "(ss)", config->ssl_profile,
user_id );
- if (result) {
- free(user_id);
- user_id = py_string_2_c(result);
- Py_XDECREF(result);
- } else {
- qd_log(conn->server->log_source, QD_LOG_DEBUG, "Internal:
failed to read displaynameservice query result");
- }
- qd_python_unlock(lock_state);
- }
- qd_log(conn->server->log_source, QD_LOG_DEBUG, "User id is '%s' ",
user_id);
- return user_id;
- }
- }
- else //config->ssl_uid_format not specified, just return the username
provided by the proton transport.
- return pn_transport_get_user(tport);
-
- return 0;
-}
-
-
-void qd_connection_set_user(qd_connection_t *conn)
-{
- pn_transport_t *tport = pn_connection_transport(conn->pn_conn);
- pn_sasl_t *sasl = pn_sasl(tport);
- if (sasl) {
- const char *mech = pn_sasl_get_mech(sasl);
- conn->user_id = pn_transport_get_user(tport);
- // We want to set the user name only if it is not already set and the
selected sasl mechanism is EXTERNAL
- if (mech && strcmp(mech, MECH_EXTERNAL) == 0) {
- const char *user_id = transport_get_user(conn, tport);
- if (user_id)
- conn->user_id = user_id;
- }
- }
-}
-
-
-qd_error_t qd_entity_refresh_sslProfile(qd_entity_t* entity, void *impl)
-{
+qd_error_t qd_entity_refresh_sslProfile(qd_entity_t* entity, void* impl) {
return QD_ERROR_NONE;
}
-qd_error_t qd_entity_refresh_authServicePlugin(qd_entity_t* entity, void *impl)
-{
+qd_error_t qd_entity_refresh_authServicePlugin(qd_entity_t* entity, void*
impl) {
return QD_ERROR_NONE;
}
+qd_server_t* qd_server(qd_dispatch_t* qd, int thread_count, const char*
container_name, const char* sasl_config_path,
+ const char* sasl_config_name) {
+ // Initialize const members. Zero initialize all others.
-static qd_error_t listener_setup_ssl(qd_connection_t *ctx, const
qd_server_config_t *config, pn_transport_t *tport)
-{
- pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
- if (!domain) return qd_error(QD_ERROR_RUNTIME, "No SSL support");
+ qd_server_t tmp = {.thread_count = thread_count};
+ // XXX This type contructor is different from what I see elsewhere.
+ qd_server_t* server = NEW(qd_server_t);
- // setup my identifying cert:
- if (pn_ssl_domain_set_credentials(domain,
- config->ssl_certificate_file,
- config->ssl_private_key_file,
- config->ssl_password)) {
- pn_ssl_domain_free(domain);
- return qd_error(QD_ERROR_RUNTIME, "Cannot set SSL credentials");
+ if (!server) {
+ return NULL;
}
- // for peer authentication:
- if (config->ssl_trusted_certificate_db) {
- if (pn_ssl_domain_set_trusted_ca_db(domain,
config->ssl_trusted_certificate_db)) {
- pn_ssl_domain_free(domain);
- return qd_error(QD_ERROR_RUNTIME, "Cannot set trusted SSL CA" );
- }
- }
+ memcpy(server, &tmp, sizeof(tmp));
- if (config->ssl_ciphers) {
- if (pn_ssl_domain_set_ciphers(domain, config->ssl_ciphers)) {
- pn_ssl_domain_free(domain);
- return qd_error(QD_ERROR_RUNTIME, "Cannot set ciphers. The ciphers
string might be invalid. Use openssl ciphers -v <ciphers> to validate");
- }
- }
+ server->qd = qd;
+ server->log_source = qd_log_source("SERVER");
+ server->protocol_log_source = qd_log_source("PROTOCOL");
+ server->container_name = container_name;
+ server->sasl_config_path = sasl_config_path;
+ server->sasl_config_name = sasl_config_name;
+ server->proactor = pn_proactor();
+ server->container = NULL;
+ server->start_context = 0; // XXX Should be NULL?
+ server->lock = sys_mutex();
+ server->conn_activation_lock = sys_mutex();
+ server->cond = sys_cond();
- if (config->ssl_protocols) {
- if (pn_ssl_domain_set_protocols(domain, config->ssl_protocols)) {
- pn_ssl_domain_free(domain);
- return qd_error(QD_ERROR_RUNTIME, "Cannot set protocols. The
protocols string might be invalid. This list is a space separated string of the
allowed TLS protocols (TLSv1 TLSv1.1 TLSv1.2)");
- }
- }
+ DEQ_INIT(server->conn_list);
+ qd_timer_initialize(server->lock);
- const char *trusted = config->ssl_trusted_certificate_db;
- if (config->ssl_trusted_certificates)
- trusted = config->ssl_trusted_certificates;
+ server->pause_requests = 0;
+ server->threads_paused = 0;
+ server->pause_next_sequence = 0;
+ server->pause_now_serving = 0;
+ server->next_connection_id = 1;
+ server->py_displayname_obj = 0;
- // do we force the peer to send a cert?
- if (config->ssl_require_peer_authentication) {
- if (!trusted || pn_ssl_domain_set_peer_authentication(domain,
PN_SSL_VERIFY_PEER, trusted)) {
- pn_ssl_domain_free(domain);
- return qd_error(QD_ERROR_RUNTIME, "Cannot set peer
authentication");
- }
- }
+ server->http = qd_http_server(server, server->log_source);
- ctx->ssl = pn_ssl(tport);
- if (!ctx->ssl || pn_ssl_init(ctx->ssl, domain, 0)) {
- pn_ssl_domain_free(domain);
- return qd_error(QD_ERROR_RUNTIME, "Cannot initialize SSL");
- }
+ qd_log(server->log_source, QD_LOG_INFO, "Container Name: %s",
server->container_name);
- // By default adding ssl to a transport forces encryption to be required,
so if it's not set that here
- if (!config->ssl_required) {
- pn_transport_require_encryption(tport, false);
- }
-
- pn_ssl_domain_free(domain);
- return QD_ERROR_NONE;
+ return server;
}
+void qd_server_free(qd_server_t* server) {
+ if (!server) {
+ return;
+ }
-static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn,
const qd_server_config_t *config)
-{
- size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY);
-
- //
- // Set the container name
- //
- pn_connection_set_container(conn, qd_server->container_name);
+ qd_connection_t* conn = DEQ_HEAD(server->conn_list);
- //
- // Offer ANONYMOUS_RELAY capability
- //
- pn_data_put_symbol(pn_connection_offered_capabilities(conn),
pn_bytes(clen, (char*) QD_CAPABILITY_ANONYMOUS_RELAY));
+ while (conn) {
+ qd_log(server->log_source, QD_LOG_INFO, "[C%" PRIu64 "] Closing
connection on shutdown", conn->connection_id);
- //
- // Create the connection properties map
- //
- pn_data_put_map(pn_connection_properties(conn));
- pn_data_enter(pn_connection_properties(conn));
+ DEQ_REMOVE_HEAD(server->conn_list);
- pn_data_put_symbol(pn_connection_properties(conn),
- pn_bytes(strlen(QD_CONNECTION_PROPERTY_PRODUCT_KEY),
QD_CONNECTION_PROPERTY_PRODUCT_KEY));
- pn_data_put_string(pn_connection_properties(conn),
- pn_bytes(strlen(QD_CONNECTION_PROPERTY_PRODUCT_VALUE),
QD_CONNECTION_PROPERTY_PRODUCT_VALUE));
+ if (conn->pn_conn) {
+ pn_transport_t* transport = pn_connection_transport(conn->pn_conn);
- pn_data_put_symbol(pn_connection_properties(conn),
- pn_bytes(strlen(QD_CONNECTION_PROPERTY_VERSION_KEY),
QD_CONNECTION_PROPERTY_VERSION_KEY));
- pn_data_put_string(pn_connection_properties(conn),
- pn_bytes(strlen(QPID_DISPATCH_VERSION),
QPID_DISPATCH_VERSION));
+ if (transport) {
+ // For the transport tracer
+ pn_transport_set_context(transport, 0);
+ }
- pn_data_put_symbol(pn_connection_properties(conn),
- pn_bytes(strlen(QD_CONNECTION_PROPERTY_CONN_ID),
QD_CONNECTION_PROPERTY_CONN_ID));
- qd_connection_t *qd_conn = pn_connection_get_context(conn);
- pn_data_put_int(pn_connection_properties(conn), qd_conn->connection_id);
+ qd_session_cleanup(conn);
+ pn_connection_set_context(conn->pn_conn, 0);
+ }
- if (config && config->inter_router_cost > 1) {
- pn_data_put_symbol(pn_connection_properties(conn),
- pn_bytes(strlen(QD_CONNECTION_PROPERTY_COST_KEY),
QD_CONNECTION_PROPERTY_COST_KEY));
- pn_data_put_int(pn_connection_properties(conn),
config->inter_router_cost);
- }
+ if (conn->free_user_id) {
+ free((char*) conn->user_id);
+ }
- if (config) {
- qd_failover_list_t *fol = config->failover_list;
- if (fol) {
- pn_data_put_symbol(pn_connection_properties(conn),
-
pn_bytes(strlen(QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY),
QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY));
- pn_data_put_list(pn_connection_properties(conn));
- pn_data_enter(pn_connection_properties(conn));
- int fol_count = qd_failover_list_size(fol);
- for (int i = 0; i < fol_count; i++) {
- pn_data_put_map(pn_connection_properties(conn));
- pn_data_enter(pn_connection_properties(conn));
- pn_data_put_symbol(pn_connection_properties(conn),
-
pn_bytes(strlen(QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY),
QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY));
- pn_data_put_string(pn_connection_properties(conn),
- pn_bytes(strlen(qd_failover_list_host(fol,
i)), qd_failover_list_host(fol, i)));
-
- pn_data_put_symbol(pn_connection_properties(conn),
-
pn_bytes(strlen(QD_CONNECTION_PROPERTY_FAILOVER_PORT_KEY),
QD_CONNECTION_PROPERTY_FAILOVER_PORT_KEY));
- pn_data_put_string(pn_connection_properties(conn),
- pn_bytes(strlen(qd_failover_list_port(fol,
i)), qd_failover_list_port(fol, i)));
+ sys_mutex_free(conn->deferred_call_lock);
- if (qd_failover_list_scheme(fol, i)) {
- pn_data_put_symbol(pn_connection_properties(conn),
-
pn_bytes(strlen(QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY),
QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY));
- pn_data_put_string(pn_connection_properties(conn),
-
pn_bytes(strlen(qd_failover_list_scheme(fol, i)), qd_failover_list_scheme(fol,
i)));
- }
+ free(conn->name);
+ free(conn->role);
- if (qd_failover_list_hostname(fol, i)) {
- pn_data_put_symbol(pn_connection_properties(conn),
-
pn_bytes(strlen(QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY),
QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY));
- pn_data_put_string(pn_connection_properties(conn),
-
pn_bytes(strlen(qd_failover_list_hostname(fol, i)),
qd_failover_list_hostname(fol, i)));
- }
- pn_data_exit(pn_connection_properties(conn));
- }
- pn_data_exit(pn_connection_properties(conn));
+ if (conn->policy_settings) {
+ free_qd_policy_settings_t(conn->policy_settings);
}
- }
-
- pn_data_exit(pn_connection_properties(conn));
-}
-/* Wake function for proactor-manaed connections */
-static void connection_wake(qd_connection_t *ctx) {
- if (ctx->pn_conn) pn_connection_wake(ctx->pn_conn);
-}
+ free_qd_connection_t(conn);
-/* Construct a new qd_connection. Thread safe. */
-qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t
*config)
-{
- qd_connection_t *ctx = new_qd_connection_t();
- if (!ctx) return NULL;
- ZERO(ctx);
- ctx->pn_conn = pn_connection();
- ctx->deferred_call_lock = sys_mutex();
- ctx->role = strdup(config->role);
- if (!ctx->pn_conn || !ctx->deferred_call_lock || !ctx->role) {
- if (ctx->pn_conn) pn_connection_free(ctx->pn_conn);
- if (ctx->deferred_call_lock) sys_mutex_free(ctx->deferred_call_lock);
- free(ctx->role);
- free(ctx);
- return NULL;
+ conn = DEQ_HEAD(server->conn_list);
}
- ctx->server = server;
- ctx->wake = connection_wake; /* Default, over-ridden for HTTP connections
*/
- pn_connection_set_context(ctx->pn_conn, ctx);
- DEQ_ITEM_INIT(ctx);
- DEQ_INIT(ctx->deferred_calls);
- DEQ_INIT(ctx->free_link_session_list);
- sys_mutex_lock(server->lock);
- ctx->connection_id = server->next_connection_id++;
- DEQ_INSERT_TAIL(server->conn_list, ctx);
- sys_mutex_unlock(server->lock);
- decorate_connection(ctx->server, ctx->pn_conn, config);
- return ctx;
-}
+ pn_proactor_free(server->proactor);
+ qd_timer_finalize();
+ sys_mutex_free(server->lock);
+ sys_mutex_free(server->conn_activation_lock);
+ sys_cond_free(server->cond);
+ Py_XDECREF((PyObject*) server->py_displayname_obj);
-static void on_accept(pn_event_t *e)
-{
- assert(pn_event_type(e) == PN_LISTENER_ACCEPT);
- pn_listener_t *pn_listener = pn_event_listener(e);
- qd_listener_t *listener = pn_listener_get_context(pn_listener);
- qd_connection_t *ctx = qd_server_connection(listener->server,
&listener->config);
- if (!ctx) {
- qd_log(listener->server->log_source, QD_LOG_CRITICAL,
- "Allocation failure during accept to %s",
listener->config.host_port);
- return;
- }
- ctx->listener = listener;
- qd_log(listener->server->log_source, QD_LOG_TRACE,
- "[%"PRIu64"]: Accepting incoming connection to '%s'",
- ctx->connection_id, ctx->listener->config.host_port);
- /* Asynchronous accept, configure the transport on PN_CONNECTION_BOUND */
- pn_listener_accept(pn_listener, ctx->pn_conn);
- }
-
-
-/* Log the description, set the transport condition (name, description) close
the transport tail. */
-void connect_fail(qd_connection_t *ctx, const char *name, const char
*description, ...)
-{
- va_list ap;
- va_start(ap, description);
- qd_verror(QD_ERROR_RUNTIME, description, ap);
- va_end(ap);
- if (ctx->pn_conn) {
- pn_transport_t *t = pn_connection_transport(ctx->pn_conn);
- /* Normally this is closing the transport but if not bound close the
connection. */
- pn_condition_t *cond = t ? pn_transport_condition(t) :
pn_connection_condition(ctx->pn_conn);
- if (cond && !pn_condition_is_set(cond)) {
- va_start(ap, description);
- pn_condition_vformat(cond, name, description, ap);
- va_end(ap);
- }
- if (t) {
- pn_transport_close_tail(t);
- } else {
- pn_connection_close(ctx->pn_conn);
- }
- }
+ free(server);
}
-
-/* Get the host IP address for the remote end */
-static void set_rhost_port(qd_connection_t *ctx) {
- pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
- const struct sockaddr* sa =
pn_netaddr_sockaddr(pn_transport_remote_addr(tport));
- size_t salen = pn_netaddr_socklen(pn_transport_remote_addr(tport));
- if (sa && salen) {
- char rport[NI_MAXSERV] = "";
- int err = getnameinfo(sa, salen,
- ctx->rhost, sizeof(ctx->rhost), rport,
sizeof(rport),
- NI_NUMERICHOST | NI_NUMERICSERV);
- if (!err) {
- snprintf(ctx->rhost_port, sizeof(ctx->rhost_port), "%s:%s",
ctx->rhost, rport);
- }
- }
+void qd_server_set_container(qd_dispatch_t* qd, qd_container_t* container) {
+ qd->server->container = container;
}
+void qd_server_trace_all_connections() {
+ qd_dispatch_t* qd = qd_dispatch_get_dispatch();
-/* Configure the transport once it is bound to the connection */
-static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
- pn_connection_t *pn_conn = pn_event_connection(e);
- qd_connection_t *ctx = pn_connection_get_context(pn_conn);
- pn_transport_t *tport = pn_connection_transport(pn_conn);
- pn_transport_set_context(tport, ctx); /* for transport_tracer */
-
- //
- // Proton pushes out its trace to transport_tracer() which in turn writes
a trace
- // message to the qdrouter log
- // If trace level logging is enabled on the PROTOCOL module, set
PN_TRACE_FRM as the transport trace
- // and also set the transport tracer callback.
- // Note here that if trace level logging is enabled on the DEFAULT module,
all modules are logging at trace level too.
- //
- if (qd_log_enabled(ctx->server->protocol_log_source, QD_LOG_TRACE)) {
- pn_transport_trace(tport, PN_TRACE_FRM);
- pn_transport_set_tracer(tport, transport_tracer);
- }
+ if (qd->server) {
+ sys_mutex_lock(qd->server->lock);
- const qd_server_config_t *config = NULL;
- if (ctx->listener) { /* Accepting an incoming connection */
- config = &ctx->listener->config;
- const char *name = config->host_port;
- pn_transport_set_server(tport);
- set_rhost_port(ctx);
-
- sys_mutex_lock(server->lock); /* Policy check is not thread safe */
- ctx->policy_counted = qd_policy_socket_accept(server->qd->policy,
ctx->rhost);
- sys_mutex_unlock(server->lock);
- if (!ctx->policy_counted) {
- pn_transport_close_tail(tport);
- pn_transport_close_head(tport);
- return;
- }
+ qd_connection_list_t conn_list = qd->server->conn_list;
+ qd_connection_t* conn = DEQ_HEAD(conn_list);
- // Set up SSL
- if (config->ssl_profile) {
- qd_log(ctx->server->log_source, QD_LOG_TRACE, "Configuring SSL on
%s", name);
- if (listener_setup_ssl(ctx, config, tport) != QD_ERROR_NONE) {
- connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "%s on %s",
qd_error_message(), name);
- return;
- }
- }
- //
- // Set up SASL
- //
- sys_mutex_lock(ctx->server->lock);
- pn_sasl_t *sasl = pn_sasl(tport);
- if (ctx->server->sasl_config_path)
- pn_sasl_config_path(sasl, ctx->server->sasl_config_path);
- pn_sasl_config_name(sasl, ctx->server->sasl_config_name);
- if (config->sasl_mechanisms)
- pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms);
- if (config->sasl_plugin_config.auth_service) {
- qd_log(server->log_source, QD_LOG_INFO, "enabling remote
authentication service %s", config->sasl_plugin_config.auth_service);
- pn_ssl_domain_t* plugin_ssl_domain = NULL;
- if (config->sasl_plugin_config.use_ssl) {
- plugin_ssl_domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
-
- if (config->sasl_plugin_config.ssl_certificate_file) {
- if (pn_ssl_domain_set_credentials(plugin_ssl_domain,
-
config->sasl_plugin_config.ssl_certificate_file,
-
config->sasl_plugin_config.ssl_private_key_file,
-
config->sasl_plugin_config.ssl_password)) {
- qd_log(server->log_source, QD_LOG_ERROR, "Cannot set
SSL credentials for authentication service");
- }
- }
- if (config->sasl_plugin_config.ssl_trusted_certificate_db) {
- if (pn_ssl_domain_set_trusted_ca_db(plugin_ssl_domain,
config->sasl_plugin_config.ssl_trusted_certificate_db)) {
- qd_log(server->log_source, QD_LOG_ERROR, "Cannot set
trusted SSL certificate db for authentication service" );
- } else {
- if
(pn_ssl_domain_set_peer_authentication(plugin_ssl_domain, PN_SSL_VERIFY_PEER,
config->sasl_plugin_config.ssl_trusted_certificate_db)) {
- qd_log(server->log_source, QD_LOG_ERROR, "Cannot
set SSL peer verification for authentication service");
- }
- }
- }
- if (config->sasl_plugin_config.ssl_ciphers) {
- if (pn_ssl_domain_set_ciphers(plugin_ssl_domain,
config->sasl_plugin_config.ssl_ciphers)) {
- qd_log(server->log_source, QD_LOG_ERROR, "Cannot set
ciphers for authentication service");
- }
- }
- if (config->sasl_plugin_config.ssl_protocols) {
- if (pn_ssl_domain_set_protocols(plugin_ssl_domain,
config->sasl_plugin_config.ssl_protocols)) {
- qd_log(server->log_source, QD_LOG_ERROR, "Cannot set
protocols for authentication service");
- }
- }
- }
- qdr_use_remote_authentication_service(tport,
config->sasl_plugin_config.auth_service,
config->sasl_plugin_config.sasl_init_hostname, plugin_ssl_domain,
server->proactor);
- }
- pn_transport_require_auth(tport, config->requireAuthentication);
- pn_transport_require_encryption(tport, config->requireEncryption);
- pn_sasl_set_allow_insecure_mechs(sasl,
config->allowInsecureAuthentication);
- sys_mutex_unlock(ctx->server->lock);
-
- qd_log(ctx->server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Accepted
connection to %s from %s",
- ctx->connection_id, name, ctx->rhost_port);
- } else if (ctx->connector) { /* Establishing an outgoing connection */
- config = &ctx->connector->config;
- setup_ssl_sasl_and_open(ctx);
-
- } else { /* No connector and no listener */
- connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "unknown Connection");
- return;
- }
-
- //
- // Common transport configuration.
- //
- pn_transport_set_max_frame(tport, config->max_frame_size);
- pn_transport_set_channel_max(tport, config->max_sessions - 1);
- pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
-}
+ while (conn) {
+ // If there is already a tracer on the transport, nothing
+ // to do, move on to the next connection.
+ pn_transport_t* transport = pn_connection_transport(conn->pn_conn);
+ if (!pn_transport_get_tracer(transport)) {
+ pn_transport_trace(transport, PN_TRACE_FRM);
+ pn_transport_set_tracer(transport, transport_tracer);
+ }
-static void invoke_deferred_calls(qd_connection_t *conn, bool discard)
-{
- if (!conn)
- return;
+ conn = DEQ_NEXT(conn);
+ }
- // Lock access to deferred_calls, other threads may concurrently add to
it. Invoke
- // the calls outside of the critical section.
- //
- sys_mutex_lock(conn->deferred_call_lock);
- qd_deferred_call_t *dc;
- while ((dc = DEQ_HEAD(conn->deferred_calls))) {
- DEQ_REMOVE_HEAD(conn->deferred_calls);
- sys_mutex_unlock(conn->deferred_call_lock);
- dc->call(dc->context, discard);
- free_qd_deferred_call_t(dc);
- sys_mutex_lock(conn->deferred_call_lock);
+ sys_mutex_unlock(qd->server->lock);
}
- sys_mutex_unlock(conn->deferred_call_lock);
}
-void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
pn_connection_t *pn_conn, qd_connection_t *qd_conn);
-void qd_conn_event_batch_complete(qd_container_t *container, qd_connection_t
*qd_conn, bool conn_closed);
+static double normalize_memory_size(const uint64_t bytes, const char** suffix);
+static void* server_thread_run(void* arg);
-static void handle_listener(pn_event_t *e, qd_server_t *qd_server) {
- qd_log_source_t *log = qd_server->log_source;
+void qd_server_run(qd_dispatch_t* qd) {
+ qd_server_t* server = qd->server;
+ int i;
- qd_listener_t *li = (qd_listener_t*)
pn_listener_get_context(pn_event_listener(e));
- const char *host_port = li->config.host_port;
- const char *port = li->config.port;
+ assert(server);
+ // Server can't run without a container
+ assert(server->container);
- switch (pn_event_type(e)) {
+ qd_log(server->log_source, QD_LOG_NOTICE, "Operational, %d Threads Running
(process ID %ld)", server->thread_count,
+ (long) getpid());
- case PN_LISTENER_OPEN: {
-
- if (strcmp(port, "0") == 0) {
- // If a 0 (zero) is specified for a port, get the actual listening
port from the listener.
- pn_listener_t *l = pn_event_listener(e);
- const pn_netaddr_t *na = pn_listener_addr(l);
- char str[PN_MAX_ADDR] = "";
- pn_netaddr_str(na, str, sizeof(str));
- // "str" contains the host and port on which this listener is
listening.
- if (li->config.name)
- qd_log(log, QD_LOG_NOTICE, "Listening on %s (%s)", str,
li->config.name);
- else
- qd_log(log, QD_LOG_NOTICE, "Listening on %s", str);
- }
- else {
- qd_log(log, QD_LOG_NOTICE, "Listening on %s", host_port);
- }
-
- break;
- }
+ const uintmax_t ram_size = qd_platform_memory_size();
+ const uint64_t vm_size = qd_router_memory_usage();
- case PN_LISTENER_ACCEPT:
- qd_log(log, QD_LOG_TRACE, "Accepting connection on %s", host_port);
- on_accept(e);
- break;
-
- case PN_LISTENER_CLOSE:
- if (li->pn_listener) {
- pn_condition_t *cond = pn_listener_condition(li->pn_listener);
- if (pn_condition_is_set(cond)) {
- qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)",
host_port,
- pn_condition_get_description(cond),
- pn_condition_get_name(cond));
- if (li->exit_on_error) {
- qd_log(log, QD_LOG_CRITICAL, "Shutting down, required
listener failed %s",
- host_port);
- exit(1);
- }
- } else {
- qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
- }
- pn_listener_set_context(li->pn_listener, 0);
- li->pn_listener = 0;
- qd_listener_decref(li);
- }
- break;
+ if (ram_size && vm_size) {
+ const char* suffix_vm = 0;
+ const char* suffix_ram = 0;
+ double vm = normalize_memory_size(vm_size, &suffix_vm);
+ double ram = normalize_memory_size(ram_size, &suffix_ram);
- default:
- break;
+ qd_log(server->log_source, QD_LOG_NOTICE, "Process VmSize %.2f %s
(%.2f %s available memory)", vm, suffix_vm,
+ ram, suffix_ram);
}
-}
-
-
-bool qd_connector_has_failover_info(qd_connector_t* ct)
-{
- if (ct && DEQ_SIZE(ct->conn_info_list) > 1)
- return true;
- return false;
-}
-
-
-static void qd_connection_free(qd_connection_t *ctx)
-{
- qd_server_t *qd_server = ctx->server;
- // If this is a dispatch connector, schedule the re-connect timer
- if (ctx->connector) {
- long delay = ctx->connector->delay;
- sys_mutex_lock(ctx->connector->lock);
- ctx->connector->ctx = 0;
- // Increment the connection index by so that we can try connecting to
the failover url (if any).
- bool has_failover = qd_connector_has_failover_info(ctx->connector);
+#ifndef NDEBUG
+ qd_log(server->log_source, QD_LOG_INFO, "Running in DEBUG Mode");
+#endif
- if (has_failover) {
- // Go thru the failover list round robin.
- // IMPORTANT: Note here that we set the re-try timer to 1 second.
- // We want to quickly keep cycling thru the failover urls every
second.
- delay = 1000;
- }
+ // Start count - 1 threads and use the current thread
- ctx->connector->state = CXTR_STATE_CONNECTING;
- sys_mutex_unlock(ctx->connector->lock);
+ int n = server->thread_count - 1;
+ sys_thread_t** threads = (sys_thread_t**) calloc(n, sizeof(sys_thread_t*));
- //
- // Increment the ref-count to account for the timer's reference to the
connector.
- //
- sys_atomic_inc(&ctx->connector->ref_count);
- qd_timer_schedule(ctx->connector->timer, delay);
+ for (i = 0; i < n; i++) {
+ threads[i] = sys_thread(server_thread_run, server);
}
- sys_mutex_lock(qd_server->lock);
- DEQ_REMOVE(qd_server->conn_list, ctx);
+ // Use the current thread
+ server_thread_run(server);
- // If counted for policy enforcement, notify it has closed
- if (ctx->policy_counted) {
- qd_policy_socket_close(qd_server->qd->policy, ctx);
+ for (i = 0; i < n; i++) {
+ sys_thread_join(threads[i]);
+ sys_thread_free(threads[i]);
}
- sys_mutex_unlock(qd_server->lock);
-
- invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
- sys_mutex_free(ctx->deferred_call_lock);
- qd_policy_settings_free(ctx->policy_settings);
- if (ctx->free_user_id) free((char*)ctx->user_id);
- if (ctx->timer) qd_timer_free(ctx->timer);
- free(ctx->name);
- free(ctx->role);
- sys_mutex_lock(qd_server->conn_activation_lock);
- free_qd_connection_t(ctx);
- sys_mutex_unlock(qd_server->conn_activation_lock);
-
- /* Note: pn_conn is freed by the proactor */
-}
+ free(threads);
-static void timeout_on_handhsake(void *context, bool discard)
-{
- if (discard)
- return;
+ // Stop HTTP threads immediately
+ qd_http_server_stop(server->http);
+ qd_http_server_free(server->http);
- qd_connection_t *ctx = (qd_connection_t*) context;
- pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
- pn_transport_close_head(tport);
- connect_fail(ctx, QD_AMQP_COND_NOT_ALLOWED, "Timeout waiting for initial
handshake");
+ qd_log(server->log_source, QD_LOG_NOTICE, "Shut Down");
}
-
-static void startup_timer_handler(void *context)
-{
- //
- // This timer fires for a connection if it has not had a REMOTE_OPEN
- // event in a time interval from the CONNECTION_INIT event. Close
- // down the transport in an IO thread reserved for that connection.
- //
- qd_connection_t *ctx = (qd_connection_t*) context;
- qd_timer_free(ctx->timer);
- ctx->timer = 0;
- qd_connection_invoke_deferred(ctx, timeout_on_handhsake, context);
+void qd_server_stop(qd_dispatch_t* qd) {
+ // Interrupt the proactor. Async-signal safe.
+ pn_proactor_interrupt(qd->server->proactor);
}
-static void qd_increment_conn_index(qd_connection_t *ctx)
-{
- if (ctx->connector) {
- qd_failover_item_t *item = qd_connector_get_conn_info(ctx->connector);
-
- if (item->retries == 1) {
- ctx->connector->conn_index += 1;
- if (ctx->connector->conn_index >
DEQ_SIZE(ctx->connector->conn_info_list))
- ctx->connector->conn_index = 1;
- item->retries = 0;
- }
- else
- item->retries += 1;
+void qd_server_activate(qd_connection_t* conn) {
+ if (conn) {
+ conn->wake(conn);
}
-
}
-
-/* Events involving a connection or listener are serialized by the proactor so
- * only one event per connection / listener will be processed at a time.
- */
-static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t
*pn_conn, qd_connection_t *ctx)
-{
- if (pn_conn && qdr_is_authentication_service_connection(pn_conn)) {
- qdr_handle_authentication_service_connection_event(e);
- return true;
- }
-
- switch (pn_event_type(e)) {
-
- case PN_PROACTOR_INTERRUPT:
- /* Interrupt the next thread */
- pn_proactor_interrupt(qd_server->proactor);
- /* Stop the current thread */
- return false;
-
- case PN_PROACTOR_TIMEOUT:
- qd_timer_visit();
- break;
-
- case PN_LISTENER_OPEN:
- case PN_LISTENER_ACCEPT:
- case PN_LISTENER_CLOSE:
- handle_listener(e, qd_server);
- break;
-
- case PN_CONNECTION_INIT: {
- const qd_server_config_t *config = ctx && ctx->listener ?
&ctx->listener->config : 0;
- if (config && config->initial_handshake_timeout_seconds > 0) {
- ctx->timer = qd_timer(qd_server->qd, startup_timer_handler, ctx);
- qd_timer_schedule(ctx->timer,
config->initial_handshake_timeout_seconds * 1000);
- }
- break;
- }
-
- case PN_CONNECTION_BOUND:
- on_connection_bound(qd_server, e);
- break;
-
- case PN_CONNECTION_REMOTE_OPEN:
- // If we are transitioning to the open state, notify the client via
callback.
- if (ctx && ctx->timer) {
- qd_timer_free(ctx->timer);
- ctx->timer = 0;
- }
- if (ctx && !ctx->opened) {
- ctx->opened = true;
- if (ctx->connector) {
- ctx->connector->delay = 2000; // Delay re-connect in case
there is a recurring error
- qd_failover_item_t *item =
qd_connector_get_conn_info(ctx->connector);
- if (item)
- item->retries = 0;
- }
- }
- break;
-
- case PN_CONNECTION_WAKE:
- invoke_deferred_calls(ctx, false);
- break;
-
- case PN_TRANSPORT_ERROR:
- {
- pn_transport_t *transport = pn_event_transport(e);
- pn_condition_t* condition = transport ?
pn_transport_condition(transport) : NULL;
- if (ctx && ctx->connector) { /* Outgoing connection */
- qd_increment_conn_index(ctx);
- const qd_server_config_t *config = &ctx->connector->config;
- ctx->connector->state = CXTR_STATE_FAILED;
- char conn_msg[300];
- if (condition && pn_condition_is_set(condition)) {
- qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection
to %s failed: %s %s", ctx->connection_id, config->host_port,
- pn_condition_get_name(condition),
pn_condition_get_description(condition));
- strcpy(ctx->connector->conn_msg, conn_msg);
-
- qd_log(qd_server->log_source, QD_LOG_INFO, conn_msg);
- } else {
- qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection
to %s failed", ctx->connection_id, config->host_port);
- strcpy(ctx->connector->conn_msg, conn_msg);
- qd_log(qd_server->log_source, QD_LOG_INFO, conn_msg);
- }
- } else if (ctx && ctx->listener) { /* Incoming connection */
- if (condition && pn_condition_is_set(condition)) {
- qd_log(ctx->server->log_source, QD_LOG_INFO, "[C%"PRIu64"]
Connection from %s (to %s) failed: %s %s",
- ctx->connection_id, ctx->rhost_port,
ctx->listener->config.host_port, pn_condition_get_name(condition),
- pn_condition_get_description(condition));
- }
- }
- }
- break;
-
- default:
- break;
- } // Switch event type
-
- if (ctx)
- qd_container_handle_event(qd_server->container, e, pn_conn, ctx);
-
- return true;
+qd_dispatch_t* qd_server_dispatch(qd_server_t* server) {
+ return server->qd;
}
-static void *thread_run(void *arg)
-{
- qd_server_t *qd_server = (qd_server_t*)arg;
- bool running = true;
- while (running) {
- pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor);
- pn_event_t * e;
- qd_connection_t *qd_conn = 0;
- pn_connection_t *pn_conn = 0;
-
- while (running && (e = pn_event_batch_next(events))) {
- pn_connection_t *conn = pn_event_connection(e);
+// Permit replacement by dummy implementation in unit_tests
+__attribute__((noinline)) void qd_server_timeout(qd_server_t* server,
qd_duration_t duration) {
+ pn_proactor_set_timeout(server->proactor, duration);
+}
- if (!pn_conn)
- pn_conn = conn;
- assert(pn_conn == conn);
+sys_mutex_t* qd_server_get_activation_lock(qd_server_t* server) {
+ return server->conn_activation_lock;
+}
- if (!qd_conn)
- qd_conn = !!pn_conn ? (qd_connection_t*)
pn_connection_get_context(pn_conn) : 0;
+static void connection_wake(qd_connection_t* conn);
+static void connection_decorate(qd_server_t* server, qd_connection_t* conn,
const qd_server_config_t* config);
- running = handle(qd_server, e, conn, qd_conn);
+// Construct a new qd_connection. Thread safe.
+qd_connection_t* qd_server_connection(qd_server_t* server, qd_server_config_t*
config) {
+ qd_connection_t* conn = new_qd_connection_t();
- /* Free the connection after all other processing is complete */
- if (qd_conn && pn_event_type(e) == PN_TRANSPORT_CLOSED) {
- qd_conn_event_batch_complete(qd_server->container, qd_conn,
true);
- pn_connection_set_context(pn_conn, NULL);
- qd_connection_free(qd_conn);
- qd_conn = 0;
- }
- }
-
- //
- // Notify the container that the batch is complete so it can do
after-batch
- // processing.
- //
- if (qd_conn)
- qd_conn_event_batch_complete(qd_server->container, qd_conn, false);
-
- pn_proactor_done(qd_server->proactor, events);
+ if (!conn) {
+ return NULL;
}
- return NULL;
-}
+ ZERO(conn);
-static qd_failover_item_t *qd_connector_get_conn_info(qd_connector_t *ct) {
+ conn->pn_conn = pn_connection();
+ conn->deferred_call_lock = sys_mutex();
+ conn->role = strdup(config->role);
- qd_failover_item_t *item = DEQ_HEAD(ct->conn_info_list);
-
- if (DEQ_SIZE(ct->conn_info_list) > 1) {
- for (int i=1; i < ct->conn_index; i++) {
- item = DEQ_NEXT(item);
+ if (!conn->pn_conn || !conn->deferred_call_lock || !conn->role) {
+ if (conn->pn_conn) {
+ pn_connection_free(conn->pn_conn);
}
- }
- return item;
-}
+ if (conn->deferred_call_lock) {
+ sys_mutex_free(conn->deferred_call_lock);
+ }
-/* Timer callback to try/retry connection open */
-static void try_open_lh(qd_connector_t *ct)
-{
- if (ct->state != CXTR_STATE_CONNECTING && ct->state != CXTR_STATE_INIT) {
- /* No longer referenced by pn_connection or timer */
- qd_connector_decref(ct);
- return;
- }
+ free(conn->role);
+ free(conn);
- qd_connection_t *ctx = qd_server_connection(ct->server, &ct->config);
- if (!ctx) { /* Try again later */
- qd_log(ct->server->log_source, QD_LOG_CRITICAL, "Allocation failure
connecting to %s",
- ct->config.host_port);
- ct->delay = 10000;
- sys_atomic_inc(&ct->ref_count);
- qd_timer_schedule(ct->timer, ct->delay);
- return;
+ return NULL;
}
- ctx->connector = ct;
- const qd_server_config_t *config = &ct->config;
-
- //
- // Set the hostname on the pn_connection. This hostname will be used by
proton as the
- // hostname in the open frame.
- //
+ conn->server = server;
- qd_failover_item_t *item = qd_connector_get_conn_info(ct);
+ // The default. It is overridden for HTTP connections.
+ conn->wake = connection_wake;
- char *current_host = item->host;
- char *host_port = item->host_port;
+ pn_connection_set_context(conn->pn_conn, conn);
- pn_connection_set_hostname(ctx->pn_conn, current_host);
+ DEQ_ITEM_INIT(conn);
+ DEQ_INIT(conn->deferred_calls);
+ DEQ_INIT(conn->free_link_session_list);
- // Set the sasl user name and password on the proton connection object.
This has to be
- // done before pn_proactor_connect which will bind a transport to the
connection
- if(config->sasl_username)
- pn_connection_set_user(ctx->pn_conn, config->sasl_username);
- if (config->sasl_password)
- pn_connection_set_password(ctx->pn_conn, config->sasl_password);
+ sys_mutex_lock(server->lock);
+ conn->connection_id = server->next_connection_id++;
+ DEQ_INSERT_TAIL(server->conn_list, conn);
+ sys_mutex_unlock(server->lock);
- ctx->connector->state = CXTR_STATE_OPEN;
- ct->ctx = ctx;
- ct->delay = 5000;
+ connection_decorate(conn->server, conn, config);
- qd_log(ct->server->log_source, QD_LOG_TRACE,
- "[%"PRIu64"] Connecting to %s", ctx->connection_id, host_port);
- /* Note: the transport is configured in the PN_CONNECTION_BOUND event */
- pn_proactor_connect(ct->server->proactor, ctx->pn_conn, host_port);
+ return conn;
}
-static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
-{
- qd_connector_t *ct = ctx->connector;
- const qd_server_config_t *config = &ct->config;
- pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
-
- //
- // Set up SSL if appropriate
- //
- if (config->ssl_profile) {
- pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
-
- if (!domain) {
- qd_error(QD_ERROR_RUNTIME, "SSL domain failed for connection to
%s:%s",
- ct->config.host, ct->config.port);
- return;
- }
+qd_listener_t* qd_server_listener(qd_server_t* server) {
+ qd_listener_t* listener = new_qd_listener_t();
- // set our trusted database for checking the peer's cert:
- if (config->ssl_trusted_certificate_db) {
- if (pn_ssl_domain_set_trusted_ca_db(domain,
config->ssl_trusted_certificate_db)) {
- qd_log(ct->server->log_source, QD_LOG_ERROR,
- "SSL CA configuration failed for %s:%s",
- ct->config.host, ct->config.port);
- }
- }
- // should we force the peer to provide a cert?
- if (config->ssl_require_peer_authentication) {
- const char *trusted = (config->ssl_trusted_certificates)
- ? config->ssl_trusted_certificates
- : config->ssl_trusted_certificate_db;
- if (pn_ssl_domain_set_peer_authentication(domain,
- PN_SSL_VERIFY_PEER,
- trusted)) {
- qd_log(ct->server->log_source, QD_LOG_ERROR,
- "SSL peer auth configuration failed for %s:%s",
- config->host, config->port);
- }
- }
-
- // configure our certificate if the peer requests one:
- if (config->ssl_certificate_file) {
- if (pn_ssl_domain_set_credentials(domain,
- config->ssl_certificate_file,
- config->ssl_private_key_file,
- config->ssl_password)) {
- qd_log(ct->server->log_source, QD_LOG_ERROR,
- "SSL local configuration failed for %s:%s",
- config->host, config->port);
- }
- }
+ if (!listener) {
+ return 0;
+ }
- if (config->ssl_ciphers) {
- if (pn_ssl_domain_set_ciphers(domain, config->ssl_ciphers)) {
- qd_log(ct->server->log_source, QD_LOG_ERROR,
- "SSL cipher configuration failed for %s:%s",
- config->host, config->port);
- }
- }
+ ZERO(listener);
+ sys_atomic_init(&listener->ref_count, 1);
- if (config->ssl_protocols) {
- if (pn_ssl_domain_set_protocols(domain, config->ssl_protocols)) {
- qd_log(ct->server->log_source, QD_LOG_ERROR,
- "Permitted TLS protocols configuration failed %s:%s",
- config->host, config->port);
- }
- }
+ listener->server = server;
+ listener->http = NULL;
- //If ssl is enabled and verify_host_name is true, instruct proton to
verify peer name
- if (config->verify_host_name) {
- if (pn_ssl_domain_set_peer_authentication(domain,
PN_SSL_VERIFY_PEER_NAME, NULL)) {
- qd_log(ct->server->log_source, QD_LOG_ERROR,
- "SSL peer host name verification failed for %s:%s",
- config->host, config->port);
- }
- }
+ return listener;
+}
- ctx->ssl = pn_ssl(tport);
- pn_ssl_init(ctx->ssl, domain, 0);
- pn_ssl_domain_free(domain);
- }
+static void connector_try_open_handler(void* context);
- //
- // Set up SASL
- //
- sys_mutex_lock(ct->server->lock);
- pn_sasl_t *sasl = pn_sasl(tport);
- if (config->sasl_mechanisms)
- pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms);
- pn_sasl_set_allow_insecure_mechs(sasl,
config->allowInsecureAuthentication);
- sys_mutex_unlock(ct->server->lock);
-
- pn_connection_open(ctx->pn_conn);
-}
+qd_connector_t* qd_server_connector(qd_server_t* server) {
+ qd_connector_t* connector = new_qd_connector_t();
-static void try_open_cb(void *context) {
- qd_connector_t *ct = (qd_connector_t*) context;
- if (!qd_connector_decref(ct)) {
- sys_mutex_lock(ct->lock); /* TODO aconway 2017-05-09: this lock
looks too big */
- try_open_lh(ct);
- sys_mutex_unlock(ct->lock);
+ if (!connector) {
+ return NULL;
}
-}
+ ZERO(connector);
+ sys_atomic_init(&connector->ref_count, 1);
-qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char
*container_name,
- const char *sasl_config_path, const char
*sasl_config_name)
-{
- /* Initialize const members, 0 initialize all others. */
- qd_server_t tmp = { .thread_count = thread_count };
- qd_server_t *qd_server = NEW(qd_server_t);
- if (qd_server == 0)
- return 0;
- memcpy(qd_server, &tmp, sizeof(tmp));
-
- qd_server->qd = qd;
- qd_server->log_source = qd_log_source("SERVER");
- qd_server->protocol_log_source = qd_log_source("PROTOCOL");
- qd_server->container_name = container_name;
- qd_server->sasl_config_path = sasl_config_path;
- qd_server->sasl_config_name = sasl_config_name;
- qd_server->proactor = pn_proactor();
- qd_server->container = 0;
- qd_server->start_context = 0;
- qd_server->lock = sys_mutex();
- qd_server->conn_activation_lock = sys_mutex();
- qd_server->cond = sys_cond();
- DEQ_INIT(qd_server->conn_list);
-
- qd_timer_initialize(qd_server->lock);
-
- qd_server->pause_requests = 0;
- qd_server->threads_paused = 0;
- qd_server->pause_next_sequence = 0;
- qd_server->pause_now_serving = 0;
- qd_server->next_connection_id = 1;
- qd_server->py_displayname_obj = 0;
-
- qd_server->http = qd_http_server(qd_server, qd_server->log_source);
-
- qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s",
qd_server->container_name);
-
- return qd_server;
-}
+ connector->server = server;
+ qd_failover_item_list_t conn_info_list;
+ DEQ_INIT(conn_info_list);
+ connector->conn_info_list = conn_info_list;
+ connector->conn_index = 1;
+ connector->state = CXTR_STATE_INIT;
+ connector->lock = sys_mutex();
+ connector->timer = qd_timer(connector->server->qd,
connector_try_open_handler, connector);
+ connector->conn_msg = (char*) malloc(300);
+ memset(connector->conn_msg, 0, 300);
-void qd_server_free(qd_server_t *qd_server)
-{
- if (!qd_server) return;
- qd_connection_t *ctx = DEQ_HEAD(qd_server->conn_list);
- while (ctx) {
- qd_log(qd_server->log_source, QD_LOG_INFO,
- "[C%"PRIu64"] Closing connection on shutdown",
- ctx->connection_id);
- DEQ_REMOVE_HEAD(qd_server->conn_list);
- if (ctx->pn_conn) {
- pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
- if (tport)
- pn_transport_set_context(tport, 0); /* for transport_tracer */
- qd_session_cleanup(ctx);
- pn_connection_set_context(ctx->pn_conn, 0);
- }
- if (ctx->free_user_id) free((char*)ctx->user_id);
- sys_mutex_free(ctx->deferred_call_lock);
- free(ctx->name);
- free(ctx->role);
- if (ctx->policy_settings)
- free_qd_policy_settings_t(ctx->policy_settings);
- free_qd_connection_t(ctx);
- ctx = DEQ_HEAD(qd_server->conn_list);
+ if (!connector->lock || !connector->timer) {
+ qd_connector_decref(connector);
+ return NULL;
}
- pn_proactor_free(qd_server->proactor);
- qd_timer_finalize();
- sys_mutex_free(qd_server->lock);
- sys_mutex_free(qd_server->conn_activation_lock);
- sys_cond_free(qd_server->cond);
- Py_XDECREF((PyObject *)qd_server->py_displayname_obj);
- free(qd_server);
-}
-void qd_server_set_container(qd_dispatch_t *qd, qd_container_t *container)
-{
- qd->server->container = container;
+ return connector;
}
-void qd_server_trace_all_connections()
-{
- qd_dispatch_t *qd = qd_dispatch_get_dispatch();
- if (qd->server) {
- sys_mutex_lock(qd->server->lock);
- qd_connection_list_t conn_list = qd->server->conn_list;
- qd_connection_t *conn = DEQ_HEAD(conn_list);
- while(conn) {
- //
- // If there is already a tracer on the transport, nothing to do,
move on to the next connection.
- //
- pn_transport_t *tport = pn_connection_transport(conn->pn_conn);
- if (! pn_transport_get_tracer(tport)) {
- pn_transport_trace(tport, PN_TRACE_FRM);
- pn_transport_set_tracer(tport, transport_tracer);
- }
- conn = DEQ_NEXT(conn);
- }
- sys_mutex_unlock(qd->server->lock);
- }
-}
+static double normalize_memory_size(const uint64_t bytes, const char** suffix)
{
+ static const char* const units[] = {"B", "KiB", "MiB", "GiB", "TiB"};
+ const int units_ct = 5;
+ const double base = 1024.0;
+ double value = (double) bytes;
-
-static double normalize_memory_size(const uint64_t bytes, const char **suffix)
-{
- static const char * const units[] = {"B", "KiB", "MiB", "GiB", "TiB"};
- const int units_ct = 5;
- const double base = 1024.0;
-
- double value = (double)bytes;
for (int i = 0; i < units_ct; ++i) {
if (value < base) {
- if (suffix)
+ if (suffix) {
*suffix = units[i];
+ }
+
return value;
}
+
value /= base;
}
- if (suffix)
+
+ if (suffix) {
*suffix = units[units_ct - 1];
+ }
+
return value;
}
-void qd_server_run(qd_dispatch_t *qd)
-{
- qd_server_t *qd_server = qd->server;
- int i;
- assert(qd_server);
- assert(qd_server->container); // Server can't run without a container
- qd_log(qd_server->log_source,
- QD_LOG_NOTICE, "Operational, %d Threads Running (process ID %ld)",
- qd_server->thread_count, (long)getpid());
+void qd_conn_event_batch_complete(qd_container_t* container, qd_connection_t*
conn, bool conn_closed);
- const uintmax_t ram_size = qd_platform_memory_size();
- const uint64_t vm_size = qd_router_memory_usage();
- if (ram_size && vm_size) {
- const char *suffix_vm = 0;
- const char *suffix_ram = 0;
- double vm = normalize_memory_size(vm_size, &suffix_vm);
- double ram = normalize_memory_size(ram_size, &suffix_ram);
- qd_log(qd_server->log_source, QD_LOG_NOTICE,
- "Process VmSize %.2f %s (%.2f %s available memory)",
- vm, suffix_vm, ram, suffix_ram);
- }
-
-#ifndef NDEBUG
- qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode");
-#endif
- int n = qd_server->thread_count - 1; /* Start count-1 threads + use
current thread */
- sys_thread_t **threads = (sys_thread_t **)calloc(n, sizeof(sys_thread_t*));
- for (i = 0; i < n; i++) {
- threads[i] = sys_thread(thread_run, qd_server);
- }
- thread_run(qd_server); /* Use the current thread */
- for (i = 0; i < n; i++) {
- sys_thread_join(threads[i]);
- sys_thread_free(threads[i]);
- }
- free(threads);
- qd_http_server_stop(qd_server->http); /* Stop HTTP threads immediately */
- qd_http_server_free(qd_server->http);
+static void* server_thread_run(void* arg) {
+ qd_server_t* server = (qd_server_t*) arg;
+ pn_proactor_t* proactor = server->proactor;
+ bool running = true;
- qd_log(qd_server->log_source, QD_LOG_NOTICE, "Shut Down");
-}
+ while (running) {
+ pn_event_batch_t* batch = pn_proactor_wait(proactor);
+ pn_event_t* event = pn_event_batch_next(batch);
+ pn_connection_t* pn_conn = pn_event_connection(event);
+ do {
+ running = qd_server_handle_event(server, event);
+ } while ((event = pn_event_batch_next(batch)));
-void qd_server_stop(qd_dispatch_t *qd)
-{
- /* Interrupt the proactor, async-signal-safe */
- pn_proactor_interrupt(qd->server->proactor);
-}
+ if (pn_conn) {
+ qd_connection_t* conn = (qd_connection_t*)
pn_connection_get_context(pn_conn);
-void qd_server_activate(qd_connection_t *ctx)
-{
- if (ctx) ctx->wake(ctx);
-}
+ if (conn) {
+ qd_conn_event_batch_complete(server->container, conn, false);
+ }
+ }
+ pn_proactor_done(proactor, batch);
+ }
-void qd_connection_set_context(qd_connection_t *conn, void *context)
-{
- conn->user_context = context;
+ return NULL;
}
+// Wake function for proactor-managed connections
+static void connection_wake(qd_connection_t* conn) {
+ assert(conn);
-void *qd_connection_get_context(qd_connection_t *conn)
-{
- return conn->user_context;
+ pn_connection_wake(conn->pn_conn);
}
-
-void *qd_connection_get_config_context(qd_connection_t *conn)
-{
- return conn->context;
+static void data_put_symbol(pn_data_t* data, const char* value) {
+ pn_data_put_symbol(data, pn_bytes(strlen(value), value));
}
+static void data_put_string(pn_data_t* data, const char* value) {
+ pn_data_put_string(data, pn_bytes(strlen(value), value));
+}
-void qd_connection_set_link_context(qd_connection_t *conn, void *context)
-{
- conn->link_context = context;
+static void data_put_map_entry(pn_data_t* data, const char* key, const char*
value) {
+ data_put_symbol(data, key);
+ data_put_string(data, value);
}
+static void connection_decorate(qd_server_t* server, qd_connection_t* conn,
const qd_server_config_t* config) {
+ pn_connection_t* pn_conn = conn->pn_conn;
-void *qd_connection_get_link_context(qd_connection_t *conn)
-{
- return conn->link_context;
-}
+ // Set the container name
+ pn_connection_set_container(pn_conn, server->container_name);
+ // Offer ANONYMOUS_RELAY capability
+ size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY);
+ pn_data_put_symbol(pn_connection_offered_capabilities(pn_conn),
+ pn_bytes(clen, (char*) QD_CAPABILITY_ANONYMOUS_RELAY));
-pn_connection_t *qd_connection_pn(qd_connection_t *conn)
-{
- return conn->pn_conn;
-}
+ // Create the connection properties map
+ pn_data_t* props = pn_connection_properties(pn_conn);
-bool qd_connection_inbound(qd_connection_t *conn)
-{
- return conn->listener != 0;
-}
+ pn_data_put_map(props);
+ pn_data_enter(props);
+ data_put_map_entry(props, QD_CONNECTION_PROPERTY_PRODUCT_KEY,
QD_CONNECTION_PROPERTY_PRODUCT_VALUE);
+ data_put_map_entry(props, QD_CONNECTION_PROPERTY_VERSION_KEY,
QPID_DISPATCH_VERSION);
-uint64_t qd_connection_connection_id(qd_connection_t *conn)
-{
- return conn->connection_id;
-}
+ data_put_symbol(props, QD_CONNECTION_PROPERTY_CONN_ID);
+ pn_data_put_int(props, conn->connection_id);
+ // XXX When is config going to be null?
+ if (config && config->inter_router_cost > 1) {
Review comment:
Btw, this looks to me like a response to a Coverity warning. Coverity
likes to sends reports about NULL config ;P The Coverity results on the page
usually have something they call "static trace", which is trying to answer this
question...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]