This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit b74236f109e7fb02bd78b56e7b7e094925b41f5a Author: Andrew Stitcher <[email protected]> AuthorDate: Thu Oct 14 22:55:05 2021 -0400 PROTON-2448: Connected up the new logger frame tracing Used the new internal logger API to log frame traces. Modified where this is now implemented - it can now attach to the very lowest frame read/write routine as it only needs the raw bytes. --- c/src/core/dispatcher.c | 18 ++++++------- c/src/core/framing.c | 70 +++++++++++++++++++++++++++++++++++++++++-------- c/src/core/framing.h | 12 ++++----- c/src/core/transport.c | 65 +++++++-------------------------------------- 4 files changed, 82 insertions(+), 83 deletions(-) diff --git a/c/src/core/dispatcher.c b/c/src/core/dispatcher.c index 3458429..4df8b17 100644 --- a/c/src/core/dispatcher.c +++ b/c/src/core/dispatcher.c @@ -77,17 +77,17 @@ static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, pn_frame_t frame) { - if (frame.size == 0) { // ignore null frames - PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, "%u <- (EMPTY FRAME)", frame.channel); + pn_bytes_t frame_payload = frame.frame_payload0; + + if (frame_payload.size == 0) { // ignore null frames return 0; } - - ssize_t dsize = pn_data_decode(args, frame.payload, frame.size); + ssize_t dsize = pn_data_decode(args, frame_payload.start, frame_payload.size); if (dsize < 0) { pn_string_format(transport->scratch, "Error decoding frame: %s %s\n", pn_code(dsize), pn_error_text(pn_data_error(args))); - pn_quote(transport->scratch, frame.payload, frame.size); + pn_quote(transport->scratch, frame_payload.start, frame_payload.size); PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, pn_string_get(transport->scratch)); return dsize; } @@ -107,12 +107,10 @@ static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, pn_fr PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error dispatching frame"); return PN_ERR; } - size_t payload_size = frame.size - dsize; - const char *payload_mem = payload_size ? frame.payload + dsize : NULL; + size_t payload_size = frame_payload.size - dsize; + const char *payload_mem = payload_size ? frame_payload.start + dsize : NULL; pn_bytes_t payload = {payload_size, payload_mem}; - pn_do_trace(transport, channel, IN, args, payload_mem, payload_size); - int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, &payload); pn_data_clear(args); @@ -127,7 +125,7 @@ ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t while (available && !*halt) { pn_frame_t frame; - ssize_t n = pn_read_frame(&frame, bytes + read, available, transport->local_max_frame); + ssize_t n = pn_read_frame(&frame, bytes + read, available, transport->local_max_frame, &transport->logger); if (n > 0) { read += n; available -= n; diff --git a/c/src/core/framing.c b/c/src/core/framing.c index bc484a9..547938a 100644 --- a/c/src/core/framing.c +++ b/c/src/core/framing.c @@ -25,7 +25,36 @@ #include "engine-internal.h" #include "util.h" -ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max) +#include <assert.h> + +static inline void pn_do_tx_trace(pn_logger_t *logger, uint16_t ch, pn_bytes_t frame) +{ + if (PN_SHOULD_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME) ) { + if (frame.size==0) { + pn_logger_logf(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, "%u -> (EMPTY FRAME)", ch); + } else { + pni_logger_log_msg_frame(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, frame, "%u -> ", ch); + } + } +} + +static inline void pn_do_rx_trace(pn_logger_t *logger, uint16_t ch, pn_bytes_t frame) +{ + if (PN_SHOULD_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME) ) { + if (frame.size==0) { + pn_logger_logf(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, "%u <- (EMPTY FRAME)", ch); + } else { + pni_logger_log_msg_frame(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, frame, "%u <- ", ch); + } + } +} + +static inline void pn_do_raw_trace(pn_logger_t *logger, pn_buffer_t *output, size_t size) +{ + PN_LOG_RAW(logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW, output, size); +} + +ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max, pn_logger_t *logger) { if (available < AMQP_HEADER_SIZE) return 0; uint32_t size = pni_read32(&bytes[0]); @@ -34,34 +63,53 @@ ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, ui unsigned int doff = 4 * (uint8_t)bytes[4]; if (doff < AMQP_HEADER_SIZE || doff > size) return PN_ERR; - frame->size = size - doff; - frame->ex_size = doff - AMQP_HEADER_SIZE; + frame->frame_payload0 = (pn_bytes_t){.size=size-doff, .start=bytes+doff}; + frame->frame_payload1 = (pn_bytes_t){.size=0,.start=NULL}; + frame->extended = (pn_bytes_t){.size=doff-AMQP_HEADER_SIZE, .start=bytes+AMQP_HEADER_SIZE}; frame->type = bytes[5]; frame->channel = pni_read16(&bytes[6]); - frame->extended = bytes + AMQP_HEADER_SIZE; - frame->payload = bytes + doff; + + pn_do_rx_trace(logger, frame->channel, frame->frame_payload0); return size; } -size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame) +size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame, pn_logger_t *logger) { - size_t size = AMQP_HEADER_SIZE + frame.ex_size + frame.size; + size_t size = AMQP_HEADER_SIZE + frame.extended.size + frame.frame_payload0.size + frame.frame_payload1.size; if (size <= pn_buffer_available(buffer)) { // Prepare header char bytes[8]; pni_write32(&bytes[0], size); - int doff = (frame.ex_size + AMQP_HEADER_SIZE - 1)/4 + 1; + int doff = (frame.extended.size + AMQP_HEADER_SIZE - 1)/4 + 1; bytes[4] = doff; bytes[5] = frame.type; pni_write16(&bytes[6], frame.channel); // Write header then rest of frame pn_buffer_append(buffer, bytes, 8); - if (frame.extended) - pn_buffer_append(buffer, frame.extended, frame.ex_size); - pn_buffer_append(buffer, frame.payload, frame.size); + pn_buffer_append(buffer, frame.extended.start, frame.extended.size); + + // Don't mess with the buffer unless we are logging frame traces to avoid + // shuffling the buffer unnecessarily. + if (PN_SHOULD_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME) ) { + // Get current buffer pointer so we can trace dump performative and payload together + pn_bytes_t smem = pn_buffer_bytes(buffer); + pn_buffer_append(buffer, frame.frame_payload0.start, frame.frame_payload0.size); + pn_buffer_append(buffer, frame.frame_payload1.start, frame.frame_payload1.size); + pn_bytes_t emem = pn_buffer_bytes(buffer); + + // The buffer can't have moved + assert(smem.start==emem.start); + pn_bytes_t frame_payload = {.size=emem.size-smem.size, .start=smem.start+smem.size}; + pn_do_tx_trace(logger, frame.channel, frame_payload); + } else { + pn_buffer_append(buffer, frame.frame_payload0.start, frame.frame_payload0.size); + pn_buffer_append(buffer, frame.frame_payload1.start, frame.frame_payload1.size); + } + pn_do_raw_trace(logger, buffer, AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size); + return size; } else { return 0; diff --git a/c/src/core/framing.h b/c/src/core/framing.h index 46e0d5e..c5fcaac 100644 --- a/c/src/core/framing.h +++ b/c/src/core/framing.h @@ -23,6 +23,7 @@ */ #include "buffer.h" +#include "logger_private.h" #include "proton/types.h" @@ -35,13 +36,12 @@ typedef struct { uint8_t type; uint16_t channel; - size_t ex_size; - const char *extended; - size_t size; - const char *payload; + pn_bytes_t extended; + pn_bytes_t frame_payload0; + pn_bytes_t frame_payload1; } pn_frame_t; -ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max); -size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame); +ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max, pn_logger_t *logger); +size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame, pn_logger_t *logger); #endif /* framing.h */ diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 98270fd..63c7a1a 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -884,28 +884,6 @@ static int pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data } -void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir, - pn_data_t *args, const char *payload, size_t size) -{ - if (PN_SHOULD_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME) ) { - pn_string_format(transport->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-"); - pn_inspect(args, transport->scratch); - - if (pn_data_size(args)==0) { - pn_string_addf(transport->scratch, "(EMPTY FRAME)"); - } - - if (size) { - char buf[1024]; - int e = pn_quote_data(buf, 1024, payload, size); - pn_string_addf(transport->scratch, " (%" PN_ZU ") \"%s\"%s", size, buf, - e == PN_OVERFLOW ? "... (truncated)" : ""); - } - - pni_logger_log(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, pn_string_get(transport->scratch)); - } -} - int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...) { pn_buffer_t *frame_buf = transport->frame; @@ -921,8 +899,6 @@ int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const ch return PN_ERR; } - pn_do_trace(transport, ch, OUT, transport->output_args, NULL, 0); - encode_performatives: pn_buffer_clear( frame_buf ); pn_rwbytes_t buf = pn_buffer_memory( frame_buf ); @@ -942,17 +918,10 @@ int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const ch pn_frame_t frame = {AMQP_FRAME_TYPE}; frame.type = type; frame.channel = ch; - frame.payload = buf.start; - frame.size = wr; - pn_buffer_ensure(transport->output_buffer, AMQP_HEADER_SIZE+frame.ex_size+frame.size); - pn_write_frame(transport->output_buffer, frame); + frame.frame_payload0 = (pn_bytes_t){.size=wr, .start=buf.start}; + pn_buffer_ensure(transport->output_buffer, AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size); + pn_write_frame(transport->output_buffer, frame, &transport->logger); transport->output_frames_ct += 1; - if (PN_SHOULD_LOG(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW)) { - pn_string_set(transport->scratch, "RAW: \""); - pn_buffer_quote(transport->output_buffer, transport->scratch, AMQP_HEADER_SIZE+frame.ex_size+frame.size); - pn_string_addf(transport->scratch, "\""); - pni_logger_log(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW, pn_string_get(transport->scratch)); - } return 0; } @@ -1032,34 +1001,18 @@ static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, } } - if (pn_buffer_available( frame ) < (available + buf.size)) { - // not enough room for payload - try again... - pn_buffer_ensure( frame, available + buf.size ); - goto encode_performatives; - } - - pn_do_trace(transport, ch, OUT, transport->output_args, payload->start, available); + pn_frame_t frame = {AMQP_FRAME_TYPE}; + frame.channel = ch; + frame.frame_payload0 = (pn_bytes_t){.size=buf.size, .start=buf.start}; + frame.frame_payload1 = (pn_bytes_t){.size=available, .start=payload->start}; - memmove( buf.start + buf.size, payload->start, available); payload->start += available; payload->size -= available; - buf.size += available; - pn_frame_t frame = {AMQP_FRAME_TYPE}; - frame.channel = ch; - frame.payload = buf.start; - frame.size = buf.size; - - pn_buffer_ensure(transport->output_buffer, AMQP_HEADER_SIZE+frame.ex_size+frame.size); - pn_write_frame(transport->output_buffer, frame); + pn_buffer_ensure(transport->output_buffer, AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size); + pn_write_frame(transport->output_buffer, frame, &transport->logger); transport->output_frames_ct += 1; framecount++; - if (PN_SHOULD_LOG(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW)) { - pn_string_set(transport->scratch, "RAW: \""); - pn_buffer_quote(transport->output_buffer, transport->scratch, AMQP_HEADER_SIZE+frame.ex_size+frame.size); - pn_string_addf(transport->scratch, "\""); - pni_logger_log(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW, pn_string_get(transport->scratch)); - } } while (payload->size > 0 && framecount < frame_limit); return framecount; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
