This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit f114a05a8a491797dab514bb58da9364869ba3ec
Author: Andrew Stitcher <[email protected]>
AuthorDate: Fri Aug 20 17:37:54 2021 -0400

    PROTON-2451: Don't treat message id/correlation_id as pn_data_t unless we 
need to.
    
    Add capability to encode/decode AMQP message id/correlation id directly
    to pn_atom_t by introducing a new formatting code 'a'. This avoids going
    through the very inefficient pn_data_t unless we really have to.
    
    If the pn_data_t APIs (pn_message_id/pn_message_correlation_id) are not used
    then using only the message id/correlation id set/get APIs should be pretty 
efficient.
    
    The first time the application gets the pn_data_t from either of the
    deprecated old APIs this activates the backwards compatibility mode and
    everything becomes slow again. There's no real alternative to this as
    these APIs give out a pointer to the messages internal state which can
    now be changed by the application directly.
---
 c/src/core/codec.c                  |  32 +++++-
 c/src/core/consumers.h              | 139 ++++++++++++++++++++++++++
 c/src/core/data.h                   |   2 +
 c/src/core/emitters.h               |  46 +++++++++
 c/src/core/message.c                | 193 +++++++++++++++++++++++++++++-------
 c/tools/codec-generator/generate.py |   2 +
 c/tools/codec-generator/specs.json  |   4 +-
 7 files changed, 379 insertions(+), 39 deletions(-)

diff --git a/c/src/core/codec.c b/c/src/core/codec.c
index 3719c22..a3047f0 100644
--- a/c/src/core/codec.c
+++ b/c/src/core/codec.c
@@ -564,6 +564,7 @@ static int pni_normalize_multiple(pn_data_t *data, 
pn_data_t *src) {
    }: exit map
    ?: TODO document
    *: TODO document
+   a: single value (pn_atom_t*) - append the pn_atom_t unmodified
    C: single value (pn_data_t*) - append the pn_data_t unmodified
    M: multiple value (pn_data_t*) - normalize and append multiple field value,
       see pni_normalize_multiple()
@@ -743,7 +744,19 @@ int pn_data_vfill(pn_data_t *data, const char *fmt, 
va_list ap)
         }
       }
       break;
-     case 'M':
+    case 'a':                   /* Append an existing pn_atom_t *  */
+      {
+        pn_atom_t *src = va_arg(ap, pn_atom_t *);
+        if (src) {
+          err = pn_data_put_atom(data, *src);
+          if (err) return err;
+        } else {
+          err = pn_data_put_null(data);
+          if (err) return err;
+        }
+      }
+      break;
+    case 'M':
       {
         pn_data_t *src = va_arg(ap, pn_data_t *);
         err = (src && pn_data_size(src) > 0) ?
@@ -1167,6 +1180,23 @@ int pn_data_vscan(pn_data_t *data, const char *fmt, 
va_list ap)
       }
       if (resume_count && level == count_level) resume_count--;
       break;
+    case 'a':
+      {
+        pn_atom_t *dst = va_arg(ap, pn_atom_t *);
+        if (!suspend) {
+          pni_node_t *next = pni_data_peek(data);
+          if (next && next->atom.type != PN_NULL) {
+            *dst = next->atom;
+          } else {
+            scanned = false;
+          }
+          pn_data_next(data);
+        } else {
+          scanned = false;
+        }
+      }
+      if (resume_count && level == count_level) resume_count--;
+      break;
     default:
       return pn_error_format(pni_data_error(data), PN_ARG_ERR, "unrecognized 
scan code: 0x%.2X '%c'", code, code);
     }
diff --git a/c/src/core/consumers.h b/c/src/core/consumers.h
index df240c8..48c3a0a 100644
--- a/c/src/core/consumers.h
+++ b/c/src/core/consumers.h
@@ -391,6 +391,145 @@ static inline bool consume_timestamp(pni_consumer_t* 
consumer, pn_timestamp_t *t
   }
 }
 
+static inline bool consume_atom(pni_consumer_t* consumer, pn_atom_t *atom) {
+  uint8_t type;
+  if (pni_consumer_readf8(consumer, &type)) {
+    switch (type) {
+    case PNE_SMALLULONG: {
+      uint8_t ul;
+      if (!pni_consumer_readf8(consumer, &ul)) break;
+      atom->type = PN_ULONG;
+      atom->u.as_ulong = ul;
+      return true;
+    }
+    case PNE_ULONG: {
+      uint64_t ul;
+      if (!pni_consumer_readf64(consumer, &ul)) break;
+      atom->type = PN_ULONG;
+      atom->u.as_ulong = ul;
+      return true;
+    }
+    case PNE_ULONG0: {
+      atom->type = PN_ULONG;
+      atom->u.as_ulong = 0;
+      return true;
+    }
+    case PNE_SMALLUINT: {
+      uint8_t ui;
+      if (!pni_consumer_readf8(consumer, &ui)) break;
+      atom->type = PN_UINT;
+      atom->u.as_uint = ui;
+      return true;
+    }
+    case PNE_UINT: {
+      uint32_t ui;
+      if (!pni_consumer_readf32(consumer, &ui)) break;
+      atom->type = PN_UINT;
+      atom->u.as_uint = ui;
+      return true;
+    }
+    case PNE_UINT0: {
+      atom->type = PN_UINT;
+      atom->u.as_uint = 0;
+      return true;
+    }
+    case PNE_USHORT: {
+      uint16_t us;
+      if (!pni_consumer_readf16(consumer, &us)) break;
+      atom->type = PN_USHORT;
+      atom->u.as_ushort = us;
+      return true;
+    }
+    case PNE_UBYTE: {
+      uint8_t ub;
+      if (!pni_consumer_readf8(consumer, &ub)) break;
+      atom->type = PN_UBYTE;
+      atom->u.as_ubyte = ub;
+      return true;
+    }
+    case PNE_BOOLEAN: {
+      uint8_t ub;
+      if (!pni_consumer_readf8(consumer, &ub)) break;
+      atom->type = PN_BOOL;
+      atom->u.as_bool = ub;
+      return true;
+    }
+    case PNE_FALSE:
+      atom->type = PN_BOOL;
+      atom->u.as_bool = false;
+      return true;
+    case PNE_TRUE:
+      atom->type = PN_BOOL;
+      atom->u.as_bool = true;
+      return true;
+    case PNE_MS64: {
+      uint64_t timestamp;
+      if (!pni_consumer_readf64(consumer, &timestamp)) break;
+      atom->type = PN_TIMESTAMP;
+      atom->u.as_timestamp = timestamp;
+      return true;
+    }
+    case PNE_VBIN32:{
+      pn_bytes_t binary;
+      if (!pni_consumer_readv32(consumer, &binary)) break;
+      atom->type = PN_BINARY;
+      atom->u.as_bytes = binary;
+      return true;
+    }
+    case PNE_VBIN8:{
+      pn_bytes_t binary;
+      if (!pni_consumer_readv8(consumer, &binary)) break;
+      atom->type = PN_BINARY;
+      atom->u.as_bytes = binary;
+      return true;
+    }
+    case PNE_STR32_UTF8: {
+      pn_bytes_t string;
+      if (!pni_consumer_readv32(consumer, &string)) break;
+      atom->type = PN_STRING;
+      atom->u.as_bytes = string;
+      return true;
+    }
+    case PNE_STR8_UTF8: {
+      pn_bytes_t string;
+      if (!pni_consumer_readv8(consumer, &string)) break;
+      atom->type = PN_STRING;
+      atom->u.as_bytes = string;
+      return true;
+    }
+    case PNE_SYM32:{
+      pn_bytes_t symbol;
+      if (!pni_consumer_readv32(consumer, &symbol)) break;
+      atom->type = PN_SYMBOL;
+      atom->u.as_bytes = symbol;
+      return true;
+    }
+    case PNE_SYM8:{
+      pn_bytes_t symbol;
+      if (!pni_consumer_readv8(consumer, &symbol)) break;
+      atom->type = PN_SYMBOL;
+      atom->u.as_bytes = symbol;
+      return true;
+    }
+    case PNE_UUID: {
+      pn_uuid_t uuid;
+      if (!pni_consumer_readf128(consumer, &uuid)) break;
+      atom->type = PN_UUID;
+      atom->u.as_uuid = uuid;
+      return true;
+    }
+    case PNE_NULL:
+      atom->type = PN_NULL;
+      return true;
+    default:
+      pni_consumer_skip_value(consumer, type);
+      break;
+    }
+  }
+  atom->type = PN_NULL;
+  return false;
+}
+
 // XXX: assuming numeric -
 // if we get a symbol we should map it to the numeric value and dispatch on 
that
 static inline bool consume_descriptor(pni_consumer_t* consumer, pni_consumer_t 
*subconsumer, uint64_t *descriptor) {
diff --git a/c/src/core/data.h b/c/src/core/data.h
index 59e60d1..b95a657 100644
--- a/c/src/core/data.h
+++ b/c/src/core/data.h
@@ -70,4 +70,6 @@ int pni_data_traverse(pn_data_t *data,
                       int (*exit)(void *ctx, pn_data_t *data, pni_node_t 
*node),
                       void *ctx);
 
+int pni_inspect_atom(pn_atom_t *atom, pn_string_t *str);
+
 #endif /* data.h */
diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h
index b1a6805..1d10f34 100644
--- a/c/src/core/emitters.h
+++ b/c/src/core/emitters.h
@@ -236,6 +236,13 @@ static inline void emit_timestamp(pni_emitter_t* emitter, 
pni_compound_context*
   compound->count++;
 }
 
+static inline void emit_uuid(pni_emitter_t* emitter, pni_compound_context* 
compound, pn_uuid_t* uuid) {
+  emit_accumulated_nulls(emitter, compound);
+  pni_emitter_writef8(emitter, PNE_UUID);
+  pni_emitter_writef128(emitter, uuid);
+  compound->count++;
+}
+
 static inline void emit_descriptor(pni_emitter_t* emitter, 
pni_compound_context* compound, uint64_t ulong) {
   emit_accumulated_nulls(emitter, compound);
   pni_emitter_writef8(emitter, PNE_DESCRIPTOR);
@@ -438,6 +445,45 @@ static inline void emit_binaryornull(pni_emitter_t* 
emitter, pni_compound_contex
   }
 }
 
+static inline void emit_atom(pni_emitter_t* emitter, pni_compound_context* 
compound, pn_atom_t* atom) {
+  switch (atom->type) {
+    default:
+    case PN_NULL:
+      emit_null(emitter, compound);
+      return;
+    case PN_BOOL:
+      emit_bool(emitter, compound, atom->u.as_bool);
+      return;
+    case PN_UBYTE:
+      emit_ubyte(emitter, compound, atom->u.as_ubyte);
+      return;
+    case PN_USHORT:
+      emit_ushort(emitter, compound, atom->u.as_ushort);
+      return;
+    case PN_UINT:
+      emit_uint(emitter, compound, atom->u.as_uint);
+      return;
+    case PN_ULONG:
+      emit_ulong(emitter, compound, atom->u.as_ulong);
+      return;
+    case PN_TIMESTAMP:
+      emit_timestamp(emitter, compound, atom->u.as_timestamp);
+      return;
+    case PN_UUID:
+      emit_uuid(emitter, compound, &atom->u.as_uuid);
+      return;
+    case PN_BINARY:
+      emit_binary_bytes(emitter, compound, atom->u.as_bytes);
+      return;
+    case PN_STRING:
+      emit_string_bytes(emitter, compound, atom->u.as_bytes);
+      return;
+    case PN_SYMBOL:
+      emit_symbol_bytes(emitter, compound, atom->u.as_bytes);
+      return;
+  }
+}
+
 // NB: This function is only correct because it currently can only be called 
to fill out an array
 static inline void emit_counted_symbols(pni_emitter_t* emitter, 
pni_compound_context* compound, size_t count, char** symbols) {
   // 64 is a heuristic - 64 3 character symbols will already be 256 bytes
diff --git a/c/src/core/message.c b/c/src/core/message.c
index a94c034..6773b7f 100644
--- a/c/src/core/message.c
+++ b/c/src/core/message.c
@@ -21,6 +21,7 @@
 
 #include "platform/platform_fmt.h"
 
+#include "data.h"
 #include "max_align.h"
 #include "message-internal.h"
 #include "protocol.h"
@@ -43,19 +44,21 @@
 // message
 
 struct pn_message_t {
+  pn_atom_t id;
+  pn_atom_t correlation_id;
   pn_timestamp_t expiry_time;
   pn_timestamp_t creation_time;
-  pn_data_t *id;
   pn_string_t *user_id;
   pn_string_t *address;
   pn_string_t *subject;
   pn_string_t *reply_to;
-  pn_data_t *correlation_id;
   pn_string_t *content_type;
   pn_string_t *content_encoding;
   pn_string_t *group_id;
   pn_string_t *reply_to_group_id;
 
+  pn_data_t *id_deprecated;
+  pn_data_t *correlation_id_deprecated;
   pn_data_t *instructions;
   pn_data_t *annotations;
   pn_data_t *properties;
@@ -76,6 +79,67 @@ struct pn_message_t {
   bool inferred;
 };
 
+void pni_msgid_clear(pn_atom_t* msgid) {
+  switch (msgid->type) {
+    case PN_BINARY:
+    case PN_STRING:
+      free((void*)msgid->u.as_bytes.start);
+    case PN_ULONG:
+    case PN_UUID:
+      msgid->type = PN_NULL;
+    case PN_NULL:
+      return;
+    default:
+      break;
+  }
+  assert(false);
+}
+
+void pni_msgid_validate_intern(pn_atom_t* msgid) {
+  switch (msgid->type) {
+    case PN_BINARY:
+    case PN_STRING: {
+      char* new = malloc(msgid->u.as_bytes.size);
+      assert(new);
+      memcpy(new, msgid->u.as_bytes.start, msgid->u.as_bytes.size);
+      msgid->u.as_bytes.start = new;
+      return;
+    }
+    case PN_ULONG:
+    case PN_UUID:
+    case PN_NULL:
+      return;
+    default:
+      // Not a legal msgid type
+      msgid->type = PN_NULL;
+      return;
+  }
+}
+
+/* This exists purely to fix bad incoming ids created by the broken ruby 
binding */
+void pni_msgid_fix_interop(pn_atom_t* msgid) {
+  switch (msgid->type) {
+    case PN_INT: {
+      int32_t v = msgid->u.as_int;
+      // Only fix if the value actually is positive
+      if (v < 0) return;
+      msgid->type = PN_ULONG;
+      msgid->u.as_ulong = v;
+      return;
+    }
+    case PN_LONG: {
+      int64_t v = msgid->u.as_long;
+      // Only fix if the value actually is positive
+      if (v < 0) return;
+      msgid->type = PN_ULONG;
+      msgid->u.as_ulong = v;
+      return;
+    }
+    default:
+      return;
+  }
+}
+
 void pn_message_finalize(void *obj)
 {
   pn_message_t *msg = (pn_message_t *) obj;
@@ -87,11 +151,13 @@ void pn_message_finalize(void *obj)
   pn_free(msg->content_encoding);
   pn_free(msg->group_id);
   pn_free(msg->reply_to_group_id);
-  pn_data_free(msg->id);
-  pn_data_free(msg->correlation_id);
+  pni_msgid_clear(&msg->id);
+  pni_msgid_clear(&msg->correlation_id);
 #ifdef GENERATE_CODEC_CODE
   pn_data_free(msg->data);
 #endif
+  if (msg->id_deprecated) pn_data_free(msg->id_deprecated);
+  if (msg->correlation_id_deprecated) 
pn_data_free(msg->correlation_id_deprecated);
   pn_data_free(msg->instructions);
   pn_data_free(msg->annotations);
   pn_data_free(msg->properties);
@@ -147,10 +213,11 @@ int pn_message_inspect(void *obj, pn_string_t *dst)
     comma = true;
   }
 
-  if (pn_data_size(msg->id)) {
+  pn_atom_t id = pn_message_get_id(msg);
+  if (id.type!=PN_NULL) {
     err = pn_string_addf(dst, "id=");
     if (err) return err;
-    err = pn_inspect(msg->id, dst);
+    err = pni_inspect_atom(&id, dst);
     if (err) return err;
     err = pn_string_addf(dst, ", ");
     if (err) return err;
@@ -187,10 +254,11 @@ int pn_message_inspect(void *obj, pn_string_t *dst)
     comma = true;
   }
 
-  if (pn_data_size(msg->correlation_id)) {
+  pn_atom_t correlation_id = pn_message_get_correlation_id(msg);
+  if (correlation_id.type!=PN_NULL) {
     err = pn_string_addf(dst, "correlation_id=");
     if (err) return err;
-    err = pn_inspect(msg->correlation_id, dst);
+    err = pni_inspect_atom(&correlation_id, dst);
     if (err) return err;
     err = pn_string_addf(dst, ", ");
     if (err) return err;
@@ -322,12 +390,12 @@ static pn_message_t *pni_message_new(size_t size)
   msg->ttl = 0;
   msg->first_acquirer = false;
   msg->delivery_count = 0;
-  msg->id = pn_data(1);
+  msg->id = (pn_atom_t){.type=PN_NULL};
   msg->user_id = pn_string(NULL);
   msg->address = pn_string(NULL);
   msg->subject = pn_string(NULL);
   msg->reply_to = pn_string(NULL);
-  msg->correlation_id = pn_data(1);
+  msg->correlation_id = (pn_atom_t){.type=PN_NULL};
   msg->content_type = pn_string(NULL);
   msg->content_encoding = pn_string(NULL);
   msg->expiry_time = 0;
@@ -340,6 +408,8 @@ static pn_message_t *pni_message_new(size_t size)
 #ifdef GENERATE_CODEC_CODE
   msg->data = pn_data(16);
 #endif
+  msg->id_deprecated = NULL;
+  msg->correlation_id_deprecated = NULL;
   msg->instructions = pn_data(16);
   msg->annotations = pn_data(16);
   msg->properties = pn_data(16);
@@ -379,12 +449,12 @@ void pn_message_clear(pn_message_t *msg)
   msg->ttl = 0;
   msg->first_acquirer = false;
   msg->delivery_count = 0;
-  pn_data_clear(msg->id);
+  pni_msgid_clear(&msg->id);
   pn_string_clear(msg->user_id);
   pn_string_clear(msg->address);
   pn_string_clear(msg->subject);
   pn_string_clear(msg->reply_to);
-  pn_data_clear(msg->correlation_id);
+  pni_msgid_clear(&msg->correlation_id);
   pn_string_clear(msg->content_type);
   pn_string_clear(msg->content_encoding);
   msg->expiry_time = 0;
@@ -396,6 +466,8 @@ void pn_message_clear(pn_message_t *msg)
 #ifdef GENERATE_CODEC_CODE
   pn_data_clear(msg->data);
 #endif
+  pn_data_clear(msg->id_deprecated);
+  pn_data_clear(msg->correlation_id_deprecated);
   pn_data_clear(msg->instructions);
   pn_data_clear(msg->annotations);
   pn_data_clear(msg->properties);
@@ -491,20 +563,38 @@ int pn_message_set_delivery_count(pn_message_t *msg, 
uint32_t count)
 pn_data_t *pn_message_id(pn_message_t *msg)
 {
   assert(msg);
-  return msg->id;
+  if (!msg->id_deprecated) {
+    msg->id_deprecated = pn_data(1);
+    if (msg->id.type!=PN_NULL) {
+      pn_data_put_atom(msg->id_deprecated, msg->id);
+      pni_msgid_clear(&msg->id);
+    }
+  }
+  return msg->id_deprecated;
 }
 
 pn_msgid_t pn_message_get_id(pn_message_t *msg)
 {
   assert(msg);
-  return pn_data_get_atom(msg->id);
+  if (msg->id_deprecated) {
+    return pn_data_get_atom(msg->id_deprecated);
+  } else {
+    return msg->id;
+  }
 }
 
 int pn_message_set_id(pn_message_t *msg, pn_msgid_t id)
 {
   assert(msg);
-  pn_data_rewind(msg->id);
-  return pn_data_put_atom(msg->id, id);
+  if (msg->id_deprecated) {
+    pn_data_rewind(msg->id_deprecated);
+    pn_data_put_atom(msg->id_deprecated, id);
+  } else {
+    pni_msgid_clear(&msg->id);
+    msg->id = id;
+    pni_msgid_validate_intern(&msg->id);
+  }
+  return 0;
 }
 
 static pn_bytes_t pn_string_get_bytes(pn_string_t *string)
@@ -564,22 +654,41 @@ int pn_message_set_reply_to(pn_message_t *msg, const char 
*reply_to)
 pn_data_t *pn_message_correlation_id(pn_message_t *msg)
 {
   assert(msg);
-  return msg->correlation_id;
+  if (!msg->correlation_id_deprecated) {
+    msg->correlation_id_deprecated = pn_data(1);
+    if (msg->correlation_id.type!=PN_NULL) {
+      pn_data_put_atom(msg->correlation_id_deprecated, msg->correlation_id);
+      pni_msgid_clear(&msg->correlation_id);
+    }
+  }
+  return msg->correlation_id_deprecated;
 }
 
 pn_msgid_t pn_message_get_correlation_id(pn_message_t *msg)
 {
   assert(msg);
-  return pn_data_get_atom(msg->correlation_id);
+  if (msg->correlation_id_deprecated) {
+    return pn_data_get_atom(msg->correlation_id_deprecated);
+  } else {
+    return msg->correlation_id;
+  }
 }
 
 int pn_message_set_correlation_id(pn_message_t *msg, pn_msgid_t id)
 {
   assert(msg);
-  pn_data_rewind(msg->correlation_id);
-  return pn_data_put_atom(msg->correlation_id, id);
+  if (msg->correlation_id_deprecated) {
+    pn_data_rewind(msg->correlation_id_deprecated);
+    pn_data_put_atom(msg->correlation_id_deprecated, id);
+  } else {
+    pni_msgid_clear(&msg->correlation_id);
+    msg->correlation_id = id;
+    pni_msgid_validate_intern(&msg->correlation_id);
+  }
+  return 0;
 }
 
+
 const char *pn_message_get_content_type(pn_message_t *msg)
 {
   assert(msg);
@@ -707,16 +816,18 @@ int pn_message_decode(pn_message_t *msg, const char 
*bytes, size_t size)
     case PROPERTIES:
       {
         pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
-          group_id, reply_to_group_id;
-        pn_data_clear(msg->id);
-        pn_data_clear(msg->correlation_id);
-        err = pn_data_scan(msg->data, "D.[CzSSSCssttSIS]", msg->id,
+                   group_id, reply_to_group_id;
+        pn_atom_t id;
+        pn_atom_t correlation_id;
+        err = pn_data_scan(msg->data, "D.[azSSSassttSIS]", &id,
                            &user_id, &address, &subject, &reply_to,
-                           msg->correlation_id, &ctype, &cencoding,
+                           &correlation_id, &ctype, &cencoding,
                            &msg->expiry_time, &msg->creation_time, &group_id,
                            &msg->group_sequence, &reply_to_group_id);
         if (err) return pn_error_format(msg->error, err, "data error: %s",
                                         
pn_error_text(pn_data_error(msg->data)));
+        pni_msgid_fix_interop(&id);
+        pn_message_set_id(msg, id);
         err = pn_string_set_bytes(msg->user_id, user_id);
         if (err) return pn_error_format(msg->error, err, "error setting 
user_id");
         err = pn_string_setn(msg->address, address.start, address.size);
@@ -725,6 +836,8 @@ int pn_message_decode(pn_message_t *msg, const char *bytes, 
size_t size)
         if (err) return pn_error_format(msg->error, err, "error setting 
subject");
         err = pn_string_setn(msg->reply_to, reply_to.start, reply_to.size);
         if (err) return pn_error_format(msg->error, err, "error setting 
reply_to");
+        pni_msgid_fix_interop(&correlation_id);
+        pn_message_set_correlation_id(msg, correlation_id);
         err = pn_string_setn(msg->content_type, ctype.start, ctype.size);
         if (err) return pn_error_format(msg->error, err, "error setting 
content_type");
         err = pn_string_setn(msg->content_encoding, cencoding.start,
@@ -811,14 +924,16 @@ int pn_message_decode(pn_message_t *msg, const char 
*bytes, size_t size)
       }
       case PROPERTIES: {
         pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
-        group_id, reply_to_group_id;
-        pn_data_clear(msg->id);
-        pn_data_clear(msg->correlation_id);
-        pn_amqp_decode_DqECzSSSCssttSISe(msg_bytes,  msg->id,
+                   group_id, reply_to_group_id;
+        pn_atom_t id;
+        pn_atom_t correlation_id;
+        pn_amqp_decode_DqEazSSSassttSISe(msg_bytes,  &id,
                            &user_id, &address, &subject, &reply_to,
-                           msg->correlation_id, &ctype, &cencoding,
+                           &correlation_id, &ctype, &cencoding,
                            &msg->expiry_time, &msg->creation_time, &group_id,
                            &msg->group_sequence, &reply_to_group_id);
+        pni_msgid_fix_interop(&id);
+        pn_message_set_id(msg, id);
         int err = pn_string_set_bytes(msg->user_id, user_id);
         if (err) return pn_error_format(msg->error, err, "error setting 
user_id");
         err = pn_string_setn(msg->address, address.start, address.size);
@@ -827,6 +942,8 @@ int pn_message_decode(pn_message_t *msg, const char *bytes, 
size_t size)
         if (err) return pn_error_format(msg->error, err, "error setting 
subject");
         err = pn_string_setn(msg->reply_to, reply_to.start, reply_to.size);
         if (err) return pn_error_format(msg->error, err, "error setting 
reply_to");
+        pni_msgid_fix_interop(&correlation_id);
+        pn_message_set_correlation_id(msg, correlation_id);
         err = pn_string_setn(msg->content_type, ctype.start, ctype.size);
         if (err) return pn_error_format(msg->error, err, "error setting 
content_type");
         err = pn_string_setn(msg->content_encoding, cencoding.start,
@@ -951,13 +1068,15 @@ int pn_message_encode(pn_message_t *msg, char *bytes, 
size_t *isize)
   }
 
   /* "DL[CzSSSCss?t?tS?IS]" */
-  last_size = pn_amqp_encode_bytes_DLECzSSSCssQtQtSQISe(bytes, remaining, 
PROPERTIES,
-                     msg->id,
+  pn_atom_t id = pn_message_get_id(msg);
+  pn_atom_t correlation_id = pn_message_get_correlation_id(msg);
+  last_size = pn_amqp_encode_bytes_DLEazSSSassQtQtSQISe(bytes, remaining, 
PROPERTIES,
+                     &id,
                      pn_string_size(msg->user_id), pn_string_get(msg->user_id),
                      pn_string_get(msg->address),
                      pn_string_get(msg->subject),
                      pn_string_get(msg->reply_to),
-                     msg->correlation_id,
+                     &correlation_id,
                      pn_string_get(msg->content_type),
                      pn_string_get(msg->content_encoding),
                      (bool)msg->expiry_time, msg->expiry_time,
@@ -1046,13 +1165,15 @@ int pn_message_data(pn_message_t *msg, pn_data_t *data)
                              pn_error_text(pn_data_error(data)));
   }
 
-  err = pn_data_fill(data, "DL[CzSSSCss?t?tS?IS]", PROPERTIES,
-                     msg->id,
+  pn_atom_t id = pn_message_get_id(msg);
+  pn_atom_t correlation_id = pn_message_get_correlation_id(msg);
+  err = pn_data_fill(data, "DL[azSSSass?t?tS?IS]", PROPERTIES,
+                     &id,
                      pn_string_size(msg->user_id), pn_string_get(msg->user_id),
                      pn_string_get(msg->address),
                      pn_string_get(msg->subject),
                      pn_string_get(msg->reply_to),
-                     msg->correlation_id,
+                     &correlation_id,
                      pn_string_get(msg->content_type),
                      pn_string_get(msg->content_encoding),
                      (bool)msg->expiry_time, msg->expiry_time,
diff --git a/c/tools/codec-generator/generate.py 
b/c/tools/codec-generator/generate.py
index fb9f68f..df92181 100644
--- a/c/tools/codec-generator/generate.py
+++ b/c/tools/codec-generator/generate.py
@@ -318,6 +318,8 @@ def parse_item(format: str) -> Tuple[ASTNode, str]:
         return NullNode('null'), format[1:]
     elif format.startswith('R'):
         return ASTNode('raw', ['pn_bytes_t'], consume_types=['pn_bytes_t*']), 
format[1:]
+    elif format.startswith('a'):
+        return ASTNode('atom', ['pn_atom_t*'], consume_types=['pn_atom_t*']), 
format[1:]
     elif format.startswith('M'):
         return ASTNode('multiple', ['pn_data_t*']), format[1:]
     elif format.startswith('o'):
diff --git a/c/tools/codec-generator/specs.json 
b/c/tools/codec-generator/specs.json
index 562e74d..15bdcdb 100644
--- a/c/tools/codec-generator/specs.json
+++ b/c/tools/codec-generator/specs.json
@@ -7,7 +7,6 @@
     "DL[?o?B?I?o?I]",
     "DL[@T[*s]]",
     "DL[Bz]",
-    "DL[CzSSSCss?t?tS?IS]",
     "DL[I?o?DL[sSC]]",
     "DL[IIzI?o?on?DLC?o?o?o]",
     "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]",
@@ -15,6 +14,7 @@
     "DL[SS?I?H?InnMMC]",
     "DL[S]",
     "DL[Z]",
+    "DL[azSSSass?t?tS?IS]",
     "DL[oI?I?o?DL[]]",
     "DL[oIn?o?DLC]",
     "DL[szS]"
@@ -28,12 +28,12 @@
     "D.[?HI]",
     "D.[?IIII?I?II.o]",
     "D.[?S?S?I?HI..CCC]",
-    "D.[CzSSSCssttSIS]",
     "D.[I?Iz.?oo.D?LRooo]",
     "D.[IoR]",
     "D.[sSC]",
     "D.[D.[sSC]]",
     "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL..?C]",
+    "D.[azSSSassttSIS]",
     "D.[o?BIoI]",
     "D.[oI?IoR]",
     "D.[?o?oC]",

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to