Repository: qpid-dispatch Updated Branches: refs/heads/master 1d34face7 -> acbae29e5
DISPATCH-451: Allow configurable maxSessions and maxSessionWindow Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/5f4e4a6f Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/5f4e4a6f Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/5f4e4a6f Branch: refs/heads/master Commit: 5f4e4a6f2a0dc326bf51823036ab8e6bad9df063 Parents: 8deffb9 Author: Chuck Rolke <[email protected]> Authored: Mon Oct 3 13:48:52 2016 -0400 Committer: Chuck Rolke <[email protected]> Committed: Mon Oct 3 13:48:52 2016 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/server.h | 16 +++++++++++++ python/qpid_dispatch/management/qdrouter.json | 26 ++++++++++++++++++++++ src/connection_manager.c | 10 ++++++++- src/container.c | 4 +++- src/policy.c | 8 +++++-- src/server.c | 2 ++ 6 files changed, 62 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5f4e4a6f/include/qpid/dispatch/server.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index 9862895..6e6f503 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -418,6 +418,22 @@ typedef struct qd_server_config_t { uint32_t max_frame_size; /** + * The max_sessions value is the number of sessions allowed on the Connection. + * This value minus one is the Open performative channel-max setting. + */ + uint32_t max_sessions; + + /** + * The max_session_window value is the maximum number of outstanding octets that are + * allowed to be in flight on a session. This value is used to calculate the number of + * outstanding transfers that are allowed by the formula: + * incoming_window = max_session_window / max_frame_size + * If max_session_window=1000000 and max_frame_size=32768 then 30 transfers may + * be outstanding before session flow control begins. + */ + uint32_t max_session_window; + + /** * The idle timeout, in seconds. If the peer sends no data frames in this many seconds, the * connection will be automatically closed. */ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5f4e4a6f/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index a320068..5b2c347 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -612,6 +612,19 @@ "description": "Defaults to 16384. If specified, it is the maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity.", "create": true }, + "maxSessions": { + "type": "integer", + "default": 32768, + "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions.", + "create": true + }, + "maxSessionWindow": { + "type": "integer", + "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size.", + "default": 1000000, + "required": false, + "create": true + }, "idleTimeoutSeconds": { "type": "integer", "default": 16, @@ -726,6 +739,19 @@ "description": "Maximum frame size in octets that will be used in the connection-open negotiation with a connected peer. The frame size is the largest contiguous set of uninterrupted data that can be sent for a message delivery over the connection. Interleaving of messages on different links is done at frame granularity.", "create": true }, + "maxSessions": { + "type": "integer", + "default": 32768, + "description": "Defaults to 32768. The maximum number of sessions that can be simultaneously active on the connection. Setting this value to zero selects the default number of sessions.", + "create": true + }, + "maxSessionWindow": { + "type": "integer", + "description": "Incoming window measured in octets for sessions created on this connection. The AMQP negotiated session incoming window, measured in transfers, is calculated to be (maxSessionWindow / maxFrameSize). Setting this value to zero selects the default session window size.", + "default": 1000000, + "required": false, + "create": true + }, "idleTimeoutSeconds": { "type": "integer", "default": 16, http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5f4e4a6f/src/connection_manager.c ---------------------------------------------------------------------- diff --git a/src/connection_manager.c b/src/connection_manager.c index a779fd6..96b85f1 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -183,6 +183,8 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf config->inter_router_cost = qd_entity_opt_long(entity, "cost", 1); CHECK(); config->protocol_family = qd_entity_opt_string(entity, "protocolFamily", 0); CHECK(); config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK(); + config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK(); + config->max_session_window = qd_entity_get_long(entity, "maxSessionWindow"); CHECK(); config->idle_timeout_seconds = qd_entity_get_long(entity, "idleTimeoutSeconds"); CHECK(); config->sasl_username = qd_entity_opt_string(entity, "saslUsername", 0); CHECK(); config->sasl_password = qd_entity_opt_string(entity, "saslPassword", 0); CHECK(); @@ -192,11 +194,17 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf set_config_host(config, entity); // - // Handle the defaults for link capacity. + // Handle the defaults for various settings // if (config->link_capacity == 0) config->link_capacity = 250; + if (config->max_sessions == 0) + config->max_sessions = 32768; + + if (config->max_session_window == 0) + config->max_session_window = 1000000; + // // For now we are hardwiring this attribute to true. If there's an outcry from the // user community, we can revisit this later. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5f4e4a6f/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index 5080258..f9cc64b 100644 --- a/src/container.c +++ b/src/container.c @@ -763,9 +763,11 @@ qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node) qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name) { qd_link_t *link = new_qd_link_t(); + const qd_server_config_t * cf = qd_connection_config(conn); + assert(cf); link->pn_sess = pn_session(qd_connection_pn(conn)); - pn_session_set_incoming_capacity(link->pn_sess, 1000000); + pn_session_set_incoming_capacity(link->pn_sess, cf->max_session_window); if (dir == QD_OUTGOING) link->pn_link = pn_sender(link->pn_sess, name); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5f4e4a6f/src/policy.c ---------------------------------------------------------------------- diff --git a/src/policy.c b/src/policy.c index a347c12..a90a796 100644 --- a/src/policy.c +++ b/src/policy.c @@ -433,11 +433,15 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) // void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn) { + size_t capacity; if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) { - pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow); + capacity = qd_conn->policy_settings->maxSessionWindow; } else { - pn_session_set_incoming_capacity(ssn, 1000000); + const qd_server_config_t * cf = qd_connection_config(qd_conn); + assert(cf); + capacity = cf->max_session_window; } + pn_session_set_incoming_capacity(ssn, capacity); } // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5f4e4a6f/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index 47cbac2..a55b0c2 100644 --- a/src/server.c +++ b/src/server.c @@ -680,6 +680,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server) // pn_transport_set_server(tport); pn_transport_set_max_frame(tport, config->max_frame_size); + pn_transport_set_channel_max(tport, config->max_sessions - 1); pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); // @@ -1247,6 +1248,7 @@ static void cxtr_try_open(void *context) // Configure the transport // pn_transport_set_max_frame(tport, config->max_frame_size); + pn_transport_set_channel_max(tport, config->max_sessions - 1); pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); // --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
