PROTON-770: Change dispatcher interface (more parameters for frame handlers) - Use transport, frametype and channel - Pass more of the frame directly into the frame handlers
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fc9b88ea Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fc9b88ea Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fc9b88ea Branch: refs/heads/master Commit: fc9b88eada8c685fb2a003570bf903a9360f48d2 Parents: e093b8c Author: Andrew Stitcher <[email protected]> Authored: Wed Nov 26 13:48:31 2014 -0500 Committer: Andrew Stitcher <[email protected]> Committed: Wed Dec 10 16:50:01 2014 -0500 ---------------------------------------------------------------------- proton-c/src/dispatch_actions.h | 30 ++++++------- proton-c/src/dispatcher/dispatcher.c | 49 +++++++------------- proton-c/src/dispatcher/dispatcher.h | 6 +-- proton-c/src/sasl/sasl.c | 32 ++++++------- proton-c/src/transport/transport.c | 75 ++++++++++++++----------------- 5 files changed, 82 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/dispatch_actions.h ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatch_actions.h b/proton-c/src/dispatch_actions.h index aa7a8f4..c3a8aab 100644 --- a/proton-c/src/dispatch_actions.h +++ b/proton-c/src/dispatch_actions.h @@ -24,22 +24,22 @@ #include "dispatcher/dispatcher.h" -/* Transport actions */ -int pn_do_open(pn_dispatcher_t *disp); -int pn_do_begin(pn_dispatcher_t *disp); -int pn_do_attach(pn_dispatcher_t *disp); -int pn_do_transfer(pn_dispatcher_t *disp); -int pn_do_flow(pn_dispatcher_t *disp); -int pn_do_disposition(pn_dispatcher_t *disp); -int pn_do_detach(pn_dispatcher_t *disp); -int pn_do_end(pn_dispatcher_t *disp); -int pn_do_close(pn_dispatcher_t *disp); +/* AMQP actions */ +int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); /* SASL actions */ -int pn_do_init(pn_dispatcher_t *disp); -int pn_do_mechanisms(pn_dispatcher_t *disp); -int pn_do_challenge(pn_dispatcher_t *disp); -int pn_do_response(pn_dispatcher_t *disp); -int pn_do_outcome(pn_dispatcher_t *disp); +int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); #endif // _PROTON_DISPATCH_ACTIONS_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/dispatcher/dispatcher.c ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c index 08c5ef1..ae04706 100644 --- a/proton-c/src/dispatcher/dispatcher.c +++ b/proton-c/src/dispatcher/dispatcher.c @@ -28,14 +28,14 @@ #include "dispatch_actions.h" -int pni_bad_frame(pn_dispatcher_t* disp) { - pn_transport_log(disp->transport, "Error dispatching frame: Unknown performative"); +int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { + pn_transport_logf(transport, "Error dispatching frame: type: %d: Unknown performative", frame_type); return PN_ERR; } // We could use a table based approach here if we needed to dynamically // add new performatives -static inline int pni_dispatch_action(pn_dispatcher_t* disp, uint64_t lcode) +static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { pn_action_t *action; switch (lcode) { @@ -58,7 +58,7 @@ static inline int pni_dispatch_action(pn_dispatcher_t* disp, uint64_t lcode) case SASL_OUTCOME: action = pn_do_outcome; break; default: action = pni_bad_frame; break; }; - return action(disp); + return action(transport, frame_type, channel, args, payload); } pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport) @@ -68,10 +68,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport) disp->frame_type = frame_type; disp->transport = transport; - disp->channel = 0; disp->args = pn_data(16); - disp->payload = NULL; - disp->size = 0; disp->output_args = pn_data(16); disp->frame = pn_buffer( 4*1024 ); @@ -124,7 +121,7 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir, } } -int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame) +static int pni_dispatch_frame(pn_dispatcher_t *disp, pn_data_t *args, pn_frame_t frame) { if (frame.size == 0) { // ignore null frames if (disp->transport->trace & PN_TRACE_FRM) @@ -132,22 +129,23 @@ int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame) return 0; } - ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size); + ssize_t dsize = pn_data_decode(args, frame.payload, frame.size); if (dsize < 0) { pn_string_format(disp->scratch, "Error decoding frame: %s %s\n", pn_code(dsize), - pn_error_text(pn_data_error(disp->args))); + pn_error_text(pn_data_error(args))); pn_quote(disp->scratch, frame.payload, frame.size); pn_transport_log(disp->transport, pn_string_get(disp->scratch)); return dsize; } - disp->channel = frame.channel; + uint8_t frame_type = frame.type; + uint16_t channel = frame.channel; // XXX: assuming numeric - // if we get a symbol we should map it to the numeric value and dispatch on that uint64_t lcode; bool scanned; - int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode); + int e = pn_data_scan(args, "D?L.", &scanned, &lcode); if (e) { pn_transport_log(disp->transport, "Scan error"); return e; @@ -156,18 +154,15 @@ int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame) pn_transport_log(disp->transport, "Error dispatching frame"); return PN_ERR; } - disp->size = frame.size - dsize; - if (disp->size) - disp->payload = frame.payload + dsize; + size_t payload_size = frame.size - dsize; + const char *payload_mem = payload_size ? frame.payload + dsize : NULL; + pn_bytes_t payload = {payload_size, payload_mem}; - pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload, disp->size); + pn_do_trace(disp, channel, IN, args, payload_mem, payload_size); - int err = pni_dispatch_action(disp, lcode); + int err = pni_dispatch_action(disp->transport, lcode, frame_type, channel, args, &payload); - disp->channel = 0; - pn_data_clear(disp->args); - disp->size = 0; - disp->payload = NULL; + pn_data_clear(args); return err; } @@ -184,7 +179,7 @@ ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t ava read += n; available -= n; disp->input_frames_ct += 1; - int e = pn_dispatch_frame(disp, frame); + int e = pni_dispatch_frame(disp, disp->args, frame); if (e) return e; } else { break; @@ -196,16 +191,6 @@ ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t ava return read; } -int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - int err = pn_data_vscan(disp->args, fmt, ap); - va_end(ap); - if (err) printf("scan error: %s\n", fmt); - return err; -} - void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size) { disp->output_payload = data; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/dispatcher/dispatcher.h ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatcher/dispatcher.h b/proton-c/src/dispatcher/dispatcher.h index c578282..b9bfa2b 100644 --- a/proton-c/src/dispatcher/dispatcher.h +++ b/proton-c/src/dispatcher/dispatcher.h @@ -32,12 +32,10 @@ typedef struct pn_dispatcher_t pn_dispatcher_t; -typedef int (pn_action_t)(pn_dispatcher_t *disp); +typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); struct pn_dispatcher_t { pn_data_t *args; - const char *payload; - size_t size; pn_data_t *output_args; const char *output_payload; size_t output_size; @@ -50,7 +48,6 @@ struct pn_dispatcher_t { uint64_t output_frames_ct; uint64_t input_frames_ct; pn_string_t *scratch; - uint16_t channel; uint8_t frame_type; // Used when constructing outgoing frames bool halt; bool batch; @@ -58,7 +55,6 @@ struct pn_dispatcher_t { pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport); void pn_dispatcher_free(pn_dispatcher_t *disp); -int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...); void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size); int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...); ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t available); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/sasl/sasl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c index 1d27002..5a174e2 100644 --- a/proton-c/src/sasl/sasl.c +++ b/proton-c/src/sasl/sasl.c @@ -400,12 +400,12 @@ ssize_t pn_sasl_output(pn_transport_t *transport, char *bytes, size_t size) } } -int pn_do_init(pn_dispatcher_t *disp) +int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pni_sasl_t *sasl = disp->transport->sasl; + pni_sasl_t *sasl = transport->sasl; pn_bytes_t mech; pn_bytes_t recv; - int err = pn_scan_args(disp, "D.[sz]", &mech, &recv); + int err = pn_data_scan(args, "D.[sz]", &mech, &recv); if (err) return err; sasl->remote_mechanisms = pn_strndup(mech.start, mech.size); pn_buffer_append(sasl->recv_data, recv.start, recv.size); @@ -413,43 +413,43 @@ int pn_do_init(pn_dispatcher_t *disp) return 0; } -int pn_do_mechanisms(pn_dispatcher_t *disp) +int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pni_sasl_t *sasl = disp->transport->sasl; + pni_sasl_t *sasl = transport->sasl; sasl->rcvd_init = true; return 0; } -int pn_do_recv(pn_dispatcher_t *disp) +int pn_do_recv(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pni_sasl_t *sasl = disp->transport->sasl; + pni_sasl_t *sasl = transport->sasl; pn_bytes_t recv; - int err = pn_scan_args(disp, "D.[z]", &recv); + int err = pn_data_scan(args, "D.[z]", &recv); if (err) return err; pn_buffer_append(sasl->recv_data, recv.start, recv.size); return 0; } -int pn_do_challenge(pn_dispatcher_t *disp) +int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - return pn_do_recv(disp); + return pn_do_recv(transport, frame_type, channel, args, payload); } -int pn_do_response(pn_dispatcher_t *disp) +int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - return pn_do_recv(disp); + return pn_do_recv(transport, frame_type, channel, args, payload); } -int pn_do_outcome(pn_dispatcher_t *disp) +int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pni_sasl_t *sasl = disp->transport->sasl; + pni_sasl_t *sasl = transport->sasl; uint8_t outcome; - int err = pn_scan_args(disp, "D.[B]", &outcome); + int err = pn_data_scan(args, "D.[B]", &outcome); if (err) return err; sasl->outcome = (pn_sasl_outcome_t) outcome; sasl->rcvd_done = true; sasl->sent_done = true; - disp->halt = true; + sasl->disp->halt = true; return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index bb59d84..097d863 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -699,16 +699,15 @@ static char *pn_bytes_strdup(pn_bytes_t str) return pn_strndup(str.start, str.size); } -int pn_do_open(pn_dispatcher_t *disp) +int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_t *transport = disp->transport; pn_connection_t *conn = transport->connection; bool container_q, hostname_q; pn_bytes_t remote_container, remote_hostname; pn_data_clear(transport->remote_offered_capabilities); pn_data_clear(transport->remote_desired_capabilities); pn_data_clear(transport->remote_properties); - int err = pn_scan_args(disp, "D.[?S?SIHI..CCC]", &container_q, + int err = pn_data_scan(args, "D.[?S?SIHI..CCC]", &container_q, &remote_container, &hostname_q, &remote_hostname, &transport->remote_max_frame, &transport->remote_channel_max, @@ -723,8 +722,8 @@ int pn_do_open(pn_dispatcher_t *disp) transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE); transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE; } - disp->remote_max_frame = transport->remote_max_frame; - pn_buffer_clear( disp->frame ); + transport->disp->remote_max_frame = transport->remote_max_frame; + pn_buffer_clear( transport->disp->frame ); } if (container_q) { transport->remote_container = pn_bytes_strdup(remote_container); @@ -747,13 +746,12 @@ int pn_do_open(pn_dispatcher_t *disp) return 0; } -int pn_do_begin(pn_dispatcher_t *disp) +int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_t *transport = disp->transport; bool reply; uint16_t remote_channel; pn_sequence_t next; - int err = pn_scan_args(disp, "D.[?HI]", &reply, &remote_channel, &next); + int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next); if (err) return err; pn_session_t *ssn; @@ -764,7 +762,7 @@ int pn_do_begin(pn_dispatcher_t *disp) ssn = pn_session(transport->connection); } ssn->state.incoming_transfer_count = next; - pni_map_remote_channel(ssn, disp->channel); + pni_map_remote_channel(ssn, channel); PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE); pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_OPEN); return 0; @@ -835,9 +833,8 @@ int pn_terminus_set_address_bytes(pn_terminus_t *terminus, pn_bytes_t address) return pn_string_setn(terminus->address, address.start, address.size); } -int pn_do_attach(pn_dispatcher_t *disp) +int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_t *transport = disp->transport; pn_bytes_t name; uint32_t handle; bool is_sender; @@ -850,7 +847,7 @@ int pn_do_attach(pn_dispatcher_t *disp) pn_bytes_t dist_mode; bool snd_settle, rcv_settle; uint8_t snd_settle_mode, rcv_settle_mode; - int err = pn_scan_args(disp, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..I]", &name, &handle, + int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..I]", &name, &handle, &is_sender, &snd_settle, &snd_settle_mode, &rcv_settle, &rcv_settle_mode, @@ -864,7 +861,7 @@ int pn_do_attach(pn_dispatcher_t *disp) strncpy(strname, name.start, name.size); strname[name.size] = '\0'; - pn_session_t *ssn = pn_channel_state(transport, disp->channel); + pn_session_t *ssn = pn_channel_state(transport, channel); if (!ssn) { pn_do_error(transport, "amqp:connection:no-session", "attach without a session"); return PN_EOS; @@ -907,7 +904,7 @@ int pn_do_attach(pn_dispatcher_t *disp) } else { uint64_t code = 0; pn_data_clear(link->remote_target.capabilities); - err = pn_scan_args(disp, "D.[.....D..DL[C]...]", &code, + err = pn_data_scan(args, "D.[.....D..DL[C]...]", &code, link->remote_target.capabilities); if (code == COORDINATOR) { pn_terminus_set_type(rtgt, PN_COORDINATOR); @@ -928,7 +925,7 @@ int pn_do_attach(pn_dispatcher_t *disp) pn_data_clear(link->remote_target.properties); pn_data_clear(link->remote_target.capabilities); - err = pn_scan_args(disp, "D.[.....D.[.....C.C.CC]D.[.....CC]", + err = pn_data_scan(args, "D.[.....D.[.....C.C.CC]D.[.....CC]", link->remote_source.properties, link->remote_source.filter, link->remote_source.outcomes, @@ -961,20 +958,19 @@ static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery) pn_clear_tpwork(delivery); } -int pn_do_transfer(pn_dispatcher_t *disp) +int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { // XXX: multi transfer - pn_transport_t *transport = disp->transport; uint32_t handle; pn_bytes_t tag; bool id_present; pn_sequence_t id; bool settled; bool more; - int err = pn_scan_args(disp, "D.[I?Iz.oo]", &handle, &id_present, &id, &tag, + int err = pn_data_scan(args, "D.[I?Iz.oo]", &handle, &id_present, &id, &tag, &settled, &more); if (err) return err; - pn_session_t *ssn = pn_channel_state(transport, disp->channel); + pn_session_t *ssn = pn_channel_state(transport, channel); if (!ssn->state.incoming_window) { return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded"); @@ -1013,8 +1009,8 @@ int pn_do_transfer(pn_dispatcher_t *disp) } } - pn_buffer_append(delivery->bytes, disp->payload, disp->size); - ssn->incoming_bytes += disp->size; + pn_buffer_append(delivery->bytes, payload->start, payload->size); + ssn->incoming_bytes += payload->size; delivery->done = !more; ssn->state.incoming_transfer_count++; @@ -1029,19 +1025,18 @@ int pn_do_transfer(pn_dispatcher_t *disp) return 0; } -int pn_do_flow(pn_dispatcher_t *disp) +int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_t *transport = disp->transport; pn_sequence_t onext, inext, delivery_count; uint32_t iwin, owin, link_credit; uint32_t handle; bool inext_init, handle_init, dcount_init, drain; - int err = pn_scan_args(disp, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin, + int err = pn_data_scan(args, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin, &onext, &owin, &handle_init, &handle, &dcount_init, &delivery_count, &link_credit, &drain); if (err) return err; - pn_session_t *ssn = pn_channel_state(transport, disp->channel); + pn_session_t *ssn = pn_channel_state(transport, channel); if (inext_init) { ssn->state.remote_incoming_window = inext + iwin - ssn->state.outgoing_transfer_count; @@ -1098,21 +1093,20 @@ static int pn_scan_error(pn_data_t *data, pn_condition_t *condition, const char return 0; } -int pn_do_disposition(pn_dispatcher_t *disp) +int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_t *transport = disp->transport; bool role; pn_sequence_t first, last; uint64_t type = 0; bool last_init, settled, type_init; pn_data_clear(transport->disp_data); - int err = pn_scan_args(disp, "D.[oI?IoD?LC]", &role, &first, &last_init, + int err = pn_data_scan(args, "D.[oI?IoD?LC]", &role, &first, &last_init, &last, &settled, &type_init, &type, transport->disp_data); if (err) return err; if (!last_init) last = first; - pn_session_t *ssn = pn_channel_state(transport, disp->channel); + pn_session_t *ssn = pn_channel_state(transport, channel); pn_delivery_map_t *deliveries; if (role) { deliveries = &ssn->state.outgoing; @@ -1177,24 +1171,23 @@ int pn_do_disposition(pn_dispatcher_t *disp) return 0; } -int pn_do_detach(pn_dispatcher_t *disp) +int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_t *transport = disp->transport; uint32_t handle; bool closed; - int err = pn_scan_args(disp, "D.[Io]", &handle, &closed); + int err = pn_data_scan(args, "D.[Io]", &handle, &closed); if (err) return err; - pn_session_t *ssn = pn_channel_state(transport, disp->channel); + pn_session_t *ssn = pn_channel_state(transport, channel); if (!ssn) { - return pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", disp->channel); + return pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", channel); } pn_link_t *link = pn_handle_state(ssn, handle); if (!link) { return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); } - err = pn_scan_error(disp->args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH); + err = pn_scan_error(args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH); if (err) return err; if (closed) @@ -1209,11 +1202,10 @@ int pn_do_detach(pn_dispatcher_t *disp) return 0; } -int pn_do_end(pn_dispatcher_t *disp) +int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_t *transport = disp->transport; - pn_session_t *ssn = pn_channel_state(transport, disp->channel); - int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT); + pn_session_t *ssn = pn_channel_state(transport, channel); + int err = pn_scan_error(args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT); if (err) return err; PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED); pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_CLOSE); @@ -1221,11 +1213,10 @@ int pn_do_end(pn_dispatcher_t *disp) return 0; } -int pn_do_close(pn_dispatcher_t *disp) +int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_t *transport = disp->transport; pn_connection_t *conn = transport->connection; - int err = pn_scan_error(disp->args, &transport->remote_condition, SCAN_ERROR_DEFAULT); + int err = pn_scan_error(args, &transport->remote_condition, SCAN_ERROR_DEFAULT); if (err) return err; transport->close_rcvd = true; PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
