This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit b74236f109e7fb02bd78b56e7b7e094925b41f5a
Author: Andrew Stitcher <[email protected]>
AuthorDate: Thu Oct 14 22:55:05 2021 -0400

    PROTON-2448: Connected up the new logger frame tracing
    
    Used the new internal logger API to log frame traces.
    
    Modified where this is now implemented - it can now attach to the very 
lowest
    frame read/write routine as it only needs the raw bytes.
---
 c/src/core/dispatcher.c | 18 ++++++-------
 c/src/core/framing.c    | 70 +++++++++++++++++++++++++++++++++++++++++--------
 c/src/core/framing.h    | 12 ++++-----
 c/src/core/transport.c  | 65 +++++++--------------------------------------
 4 files changed, 82 insertions(+), 83 deletions(-)

diff --git a/c/src/core/dispatcher.c b/c/src/core/dispatcher.c
index 3458429..4df8b17 100644
--- a/c/src/core/dispatcher.c
+++ b/c/src/core/dispatcher.c
@@ -77,17 +77,17 @@ static inline int pni_dispatch_action(pn_transport_t* 
transport, uint64_t lcode,
 
 static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, 
pn_frame_t frame)
 {
-  if (frame.size == 0) { // ignore null frames
-    PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, "%u <- 
(EMPTY FRAME)", frame.channel);
+  pn_bytes_t frame_payload = frame.frame_payload0;
+
+  if (frame_payload.size == 0) { // ignore null frames
     return 0;
   }
-
-  ssize_t dsize = pn_data_decode(args, frame.payload, frame.size);
+  ssize_t dsize = pn_data_decode(args, frame_payload.start, 
frame_payload.size);
   if (dsize < 0) {
     pn_string_format(transport->scratch,
                      "Error decoding frame: %s %s\n", pn_code(dsize),
                      pn_error_text(pn_data_error(args)));
-    pn_quote(transport->scratch, frame.payload, frame.size);
+    pn_quote(transport->scratch, frame_payload.start, frame_payload.size);
     PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, 
pn_string_get(transport->scratch));
     return dsize;
   }
@@ -107,12 +107,10 @@ static int pni_dispatch_frame(pn_transport_t * transport, 
pn_data_t *args, pn_fr
     PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error 
dispatching frame");
     return PN_ERR;
   }
-  size_t payload_size = frame.size - dsize;
-  const char *payload_mem = payload_size ? frame.payload + dsize : NULL;
+  size_t payload_size = frame_payload.size - dsize;
+  const char *payload_mem = payload_size ? frame_payload.start + dsize : NULL;
   pn_bytes_t payload = {payload_size, payload_mem};
 
-  pn_do_trace(transport, channel, IN, args, payload_mem, payload_size);
-
   int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, 
&payload);
 
   pn_data_clear(args);
@@ -127,7 +125,7 @@ ssize_t pn_dispatcher_input(pn_transport_t *transport, 
const char *bytes, size_t
   while (available && !*halt) {
     pn_frame_t frame;
 
-    ssize_t n = pn_read_frame(&frame, bytes + read, available, 
transport->local_max_frame);
+    ssize_t n = pn_read_frame(&frame, bytes + read, available, 
transport->local_max_frame, &transport->logger);
     if (n > 0) {
       read += n;
       available -= n;
diff --git a/c/src/core/framing.c b/c/src/core/framing.c
index bc484a9..547938a 100644
--- a/c/src/core/framing.c
+++ b/c/src/core/framing.c
@@ -25,7 +25,36 @@
 #include "engine-internal.h"
 #include "util.h"
 
-ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, 
uint32_t max)
+#include <assert.h>
+
+static inline void pn_do_tx_trace(pn_logger_t *logger, uint16_t ch, pn_bytes_t 
frame)
+{
+  if (PN_SHOULD_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME) ) {
+    if (frame.size==0) {
+      pn_logger_logf(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, "%u -> (EMPTY 
FRAME)", ch);
+    } else {
+      pni_logger_log_msg_frame(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, 
frame, "%u -> ", ch);
+    }
+  }
+}
+
+static inline void pn_do_rx_trace(pn_logger_t *logger, uint16_t ch, pn_bytes_t 
frame)
+{
+  if (PN_SHOULD_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME) ) {
+    if (frame.size==0) {
+      pn_logger_logf(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, "%u <- (EMPTY 
FRAME)", ch);
+    } else {
+      pni_logger_log_msg_frame(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, 
frame, "%u <- ", ch);
+    }
+  }
+}
+
+static inline void pn_do_raw_trace(pn_logger_t *logger, pn_buffer_t *output, 
size_t size)
+{
+  PN_LOG_RAW(logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW, output, size);
+}
+
+ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, 
uint32_t max, pn_logger_t *logger)
 {
   if (available < AMQP_HEADER_SIZE) return 0;
   uint32_t size = pni_read32(&bytes[0]);
@@ -34,34 +63,53 @@ ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, 
size_t available, ui
   unsigned int doff = 4 * (uint8_t)bytes[4];
   if (doff < AMQP_HEADER_SIZE || doff > size) return PN_ERR;
 
-  frame->size = size - doff;
-  frame->ex_size = doff - AMQP_HEADER_SIZE;
+  frame->frame_payload0 = (pn_bytes_t){.size=size-doff, .start=bytes+doff};
+  frame->frame_payload1 = (pn_bytes_t){.size=0,.start=NULL};
+  frame->extended = (pn_bytes_t){.size=doff-AMQP_HEADER_SIZE, 
.start=bytes+AMQP_HEADER_SIZE};
   frame->type = bytes[5];
   frame->channel = pni_read16(&bytes[6]);
-  frame->extended = bytes + AMQP_HEADER_SIZE;
-  frame->payload = bytes + doff;
+
+  pn_do_rx_trace(logger, frame->channel, frame->frame_payload0);
 
   return size;
 }
 
-size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame)
+size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame, pn_logger_t 
*logger)
 {
-  size_t size = AMQP_HEADER_SIZE + frame.ex_size + frame.size;
+  size_t size = AMQP_HEADER_SIZE + frame.extended.size + 
frame.frame_payload0.size + frame.frame_payload1.size;
   if (size <= pn_buffer_available(buffer))
   {
     // Prepare header
     char bytes[8];
     pni_write32(&bytes[0], size);
-    int doff = (frame.ex_size + AMQP_HEADER_SIZE - 1)/4 + 1;
+    int doff = (frame.extended.size + AMQP_HEADER_SIZE - 1)/4 + 1;
     bytes[4] = doff;
     bytes[5] = frame.type;
     pni_write16(&bytes[6], frame.channel);
 
     // Write header then rest of frame
     pn_buffer_append(buffer, bytes, 8);
-    if (frame.extended)
-    pn_buffer_append(buffer, frame.extended, frame.ex_size);
-    pn_buffer_append(buffer, frame.payload, frame.size);
+    pn_buffer_append(buffer, frame.extended.start, frame.extended.size);
+
+    // Don't mess with the buffer unless we are logging frame traces to avoid
+    // shuffling the buffer unnecessarily.
+    if (PN_SHOULD_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME) ) {
+      // Get current buffer pointer so we can trace dump performative and 
payload together
+      pn_bytes_t smem = pn_buffer_bytes(buffer);
+      pn_buffer_append(buffer, frame.frame_payload0.start, 
frame.frame_payload0.size);
+      pn_buffer_append(buffer, frame.frame_payload1.start, 
frame.frame_payload1.size);
+      pn_bytes_t emem = pn_buffer_bytes(buffer);
+
+      // The buffer can't have moved
+      assert(smem.start==emem.start);
+      pn_bytes_t frame_payload = {.size=emem.size-smem.size, 
.start=smem.start+smem.size};
+      pn_do_tx_trace(logger, frame.channel, frame_payload);
+    } else {
+      pn_buffer_append(buffer, frame.frame_payload0.start, 
frame.frame_payload0.size);
+      pn_buffer_append(buffer, frame.frame_payload1.start, 
frame.frame_payload1.size);
+    }
+    pn_do_raw_trace(logger, buffer, 
AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size);
+
     return size;
   } else {
     return 0;
diff --git a/c/src/core/framing.h b/c/src/core/framing.h
index 46e0d5e..c5fcaac 100644
--- a/c/src/core/framing.h
+++ b/c/src/core/framing.h
@@ -23,6 +23,7 @@
  */
 
 #include "buffer.h"
+#include "logger_private.h"
 
 #include "proton/types.h"
 
@@ -35,13 +36,12 @@
 typedef struct {
   uint8_t type;
   uint16_t channel;
-  size_t ex_size;
-  const char *extended;
-  size_t size;
-  const char *payload;
+  pn_bytes_t extended;
+  pn_bytes_t frame_payload0;
+  pn_bytes_t frame_payload1;
 } pn_frame_t;
 
-ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, 
uint32_t max);
-size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame);
+ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, 
uint32_t max, pn_logger_t *logger);
+size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame, pn_logger_t 
*logger);
 
 #endif /* framing.h */
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index 98270fd..63c7a1a 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -884,28 +884,6 @@ static int pni_disposition_encode(pn_disposition_t 
*disposition, pn_data_t *data
 }
 
 
-void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir,
-                 pn_data_t *args, const char *payload, size_t size)
-{
-  if (PN_SHOULD_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME) ) {
-    pn_string_format(transport->scratch, "%u %s ", ch, dir == OUT ? "->" : 
"<-");
-    pn_inspect(args, transport->scratch);
-
-    if (pn_data_size(args)==0) {
-        pn_string_addf(transport->scratch, "(EMPTY FRAME)");
-    }
-
-    if (size) {
-      char buf[1024];
-      int e = pn_quote_data(buf, 1024, payload, size);
-      pn_string_addf(transport->scratch, " (%" PN_ZU ") \"%s\"%s", size, buf,
-                     e == PN_OVERFLOW ? "... (truncated)" : "");
-    }
-
-    pni_logger_log(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, 
pn_string_get(transport->scratch));
-  }
-}
-
 int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const 
char *fmt, ...)
 {
   pn_buffer_t *frame_buf = transport->frame;
@@ -921,8 +899,6 @@ int pn_post_frame(pn_transport_t *transport, uint8_t type, 
uint16_t ch, const ch
     return PN_ERR;
   }
 
-  pn_do_trace(transport, ch, OUT, transport->output_args, NULL, 0);
-
  encode_performatives:
   pn_buffer_clear( frame_buf );
   pn_rwbytes_t buf = pn_buffer_memory( frame_buf );
@@ -942,17 +918,10 @@ int pn_post_frame(pn_transport_t *transport, uint8_t 
type, uint16_t ch, const ch
   pn_frame_t frame = {AMQP_FRAME_TYPE};
   frame.type = type;
   frame.channel = ch;
-  frame.payload = buf.start;
-  frame.size = wr;
-  pn_buffer_ensure(transport->output_buffer, 
AMQP_HEADER_SIZE+frame.ex_size+frame.size);
-  pn_write_frame(transport->output_buffer, frame);
+  frame.frame_payload0 = (pn_bytes_t){.size=wr, .start=buf.start};
+  pn_buffer_ensure(transport->output_buffer, 
AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size);
+  pn_write_frame(transport->output_buffer, frame, &transport->logger);
   transport->output_frames_ct += 1;
-  if (PN_SHOULD_LOG(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW)) {
-    pn_string_set(transport->scratch, "RAW: \"");
-    pn_buffer_quote(transport->output_buffer, transport->scratch, 
AMQP_HEADER_SIZE+frame.ex_size+frame.size);
-    pn_string_addf(transport->scratch, "\"");
-    pni_logger_log(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW, 
pn_string_get(transport->scratch));
-  }
 
   return 0;
 }
@@ -1032,34 +1001,18 @@ static int pni_post_amqp_transfer_frame(pn_transport_t 
*transport, uint16_t ch,
       }
     }
 
-    if (pn_buffer_available( frame ) < (available + buf.size)) {
-      // not enough room for payload - try again...
-      pn_buffer_ensure( frame, available + buf.size );
-      goto encode_performatives;
-    }
-
-    pn_do_trace(transport, ch, OUT, transport->output_args, payload->start, 
available);
+    pn_frame_t frame = {AMQP_FRAME_TYPE};
+    frame.channel = ch;
+    frame.frame_payload0 = (pn_bytes_t){.size=buf.size, .start=buf.start};
+    frame.frame_payload1 = (pn_bytes_t){.size=available, 
.start=payload->start};
 
-    memmove( buf.start + buf.size, payload->start, available);
     payload->start += available;
     payload->size -= available;
-    buf.size += available;
 
-    pn_frame_t frame = {AMQP_FRAME_TYPE};
-    frame.channel = ch;
-    frame.payload = buf.start;
-    frame.size = buf.size;
-
-    pn_buffer_ensure(transport->output_buffer, 
AMQP_HEADER_SIZE+frame.ex_size+frame.size);
-    pn_write_frame(transport->output_buffer, frame);
+    pn_buffer_ensure(transport->output_buffer, 
AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size);
+    pn_write_frame(transport->output_buffer, frame, &transport->logger);
     transport->output_frames_ct += 1;
     framecount++;
-    if (PN_SHOULD_LOG(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW)) {
-      pn_string_set(transport->scratch, "RAW: \"");
-      pn_buffer_quote(transport->output_buffer, transport->scratch, 
AMQP_HEADER_SIZE+frame.ex_size+frame.size);
-      pn_string_addf(transport->scratch, "\"");
-      pni_logger_log(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_RAW, 
pn_string_get(transport->scratch));
-    }
   } while (payload->size > 0 && framecount < frame_limit);
 
   return framecount;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to