This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 08d14b37 fix: Accommodate IPC messages without continuation bytes 
(#629)
08d14b37 is described below

commit 08d14b378c123807b399e455d33693375f3d5f87
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Fri Sep 20 16:23:15 2024 -0500

    fix: Accommodate IPC messages without continuation bytes (#629)
    
    Before arrow 0.15, encapsulated messages started with just the length
    instead of with a continuation token
---
 src/nanoarrow/ipc/decoder.c       | 88 +++++++++++++++++++--------------------
 src/nanoarrow/ipc/decoder_test.cc | 25 +++++++----
 2 files changed, 61 insertions(+), 52 deletions(-)

diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index 5be3794e..dcde4335 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -52,10 +52,6 @@
 #define ENODATA 120
 #endif
 
-// A more readable expression way to refer to the fact that there are 8 bytes
-// at the beginning of every message header.
-static const int32_t kMessageHeaderPrefixSize = 8;
-
 #define NANOARROW_IPC_MAGIC "ARROW1"
 
 // Internal representation of a parsed "Field" from flatbuffers. This
@@ -272,14 +268,6 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) 
{
   }
 }
 
-static inline uint32_t ArrowIpcReadContinuationBytes(struct ArrowBufferView* 
data) {
-  uint32_t value;
-  memcpy(&value, data->data.as_uint8, sizeof(uint32_t));
-  data->data.as_uint8 += sizeof(uint32_t);
-  data->size_bytes -= sizeof(uint32_t);
-  return value;
-}
-
 static inline int32_t ArrowIpcReadInt32LE(struct ArrowBufferView* data, int 
swap_endian) {
   int32_t value;
   memcpy(&value, data->data.as_uint8, sizeof(int32_t));
@@ -984,40 +972,49 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct 
ArrowIpcDecoder* decode
 
 // Returns NANOARROW_OK if data is large enough to read the first 8 bytes
 // of the message header, ESPIPE if reading more data might help, or EINVAL if 
the content
-// is not valid. Advances the input ArrowBufferView by 8 bytes.
+// is not valid. Advances the input ArrowBufferView by prefix_size (8 bytes or 
4 bytes if
+// the message is pre-0.15 and has no continuation). Sets 
decoder->header_size_bytes
+// to the flatbuffers length plus the prefix_size.
 static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* 
decoder,
                                                   struct ArrowBufferView* 
data_mut,
-                                                  int32_t* message_size_bytes,
+                                                  int32_t* prefix_size_bytes,
                                                   struct ArrowError* error) {
   struct ArrowIpcDecoderPrivate* private_data =
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
-  if (data_mut->size_bytes < kMessageHeaderPrefixSize) {
+  if (data_mut->size_bytes < 8) {
     ArrowErrorSet(error,
                   "Expected data of at least 8 bytes but only %" PRId64 " 
bytes remain",
                   data_mut->size_bytes);
     return ESPIPE;
   }
 
-  uint32_t continuation = ArrowIpcReadContinuationBytes(data_mut);
-  if (continuation != 0xFFFFFFFF) {
-    ArrowErrorSet(error, "Expected 0xFFFFFFFF at start of message but found 
0x%08X",
-                  (unsigned int)continuation);
-    return EINVAL;
+  int swap_endian = private_data->system_endianness == 
NANOARROW_IPC_ENDIANNESS_BIG;
+  int32_t continuation = ArrowIpcReadInt32LE(data_mut, swap_endian);
+  int32_t length;
+  if ((uint32_t)continuation != 0xFFFFFFFF) {
+    if (continuation < 0) {
+      ArrowErrorSet(error, "Expected 0xFFFFFFFF at start of message but found 
0x%08X",
+                    (unsigned int)continuation);
+      return EINVAL;
+    }
+    // Tolerate pre-0.15 encapsulated messages which only had the length prefix
+    length = continuation;
+    *prefix_size_bytes = sizeof(length);
+  } else {
+    length = ArrowIpcReadInt32LE(data_mut, swap_endian);
+    *prefix_size_bytes = sizeof(continuation) + sizeof(length);
   }
+  decoder->header_size_bytes = *prefix_size_bytes + length;
 
-  int swap_endian = private_data->system_endianness == 
NANOARROW_IPC_ENDIANNESS_BIG;
-  int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
-  *message_size_bytes = header_body_size_bytes + kMessageHeaderPrefixSize;
-  if (header_body_size_bytes < 0) {
+  if (length < 0) {
     ArrowErrorSet(error,
-                  "Expected message body size > 0 but found message body size 
of %" PRId32
-                  " bytes",
-                  header_body_size_bytes);
+                  "Expected message size > 0 but found message size of %" 
PRId32 " bytes",
+                  length);
     return EINVAL;
   }
 
-  if (header_body_size_bytes == 0) {
+  if (length == 0) {
     ArrowErrorSet(error, "End of Arrow stream");
     return ENODATA;
   }
@@ -1029,8 +1026,10 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct 
ArrowIpcDecoder* decoder,
                                          struct ArrowBufferView data,
                                          struct ArrowError* error) {
   ArrowIpcDecoderResetHeaderInfo(decoder);
-  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
-      decoder, &data, &decoder->header_size_bytes, error));
+  int32_t prefix_size_bytes;
+  NANOARROW_RETURN_NOT_OK(
+      ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, 
error));
+  NANOARROW_UNUSED(prefix_size_bytes);
   return NANOARROW_OK;
 }
 
@@ -1041,24 +1040,24 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct 
ArrowIpcDecoder* decoder,
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
   ArrowIpcDecoderResetHeaderInfo(decoder);
-  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
-      decoder, &data, &decoder->header_size_bytes, error));
+  int32_t prefix_size_bytes;
+  NANOARROW_RETURN_NOT_OK(
+      ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, 
error));
 
   // Check that data contains at least the entire header (return ESPIPE to 
signal
   // that reading more data may help).
-  int64_t message_body_size = decoder->header_size_bytes - 
kMessageHeaderPrefixSize;
-  if (data.size_bytes < message_body_size) {
+  if (data.size_bytes < decoder->header_size_bytes - prefix_size_bytes) {
     ArrowErrorSet(error,
-                  "Expected >= %" PRId64 " bytes of remaining data but found 
%" PRId64
+                  "Expected >= %d bytes of remaining data but found %" PRId64
                   " bytes in buffer",
-                  message_body_size + kMessageHeaderPrefixSize,
-                  data.size_bytes + kMessageHeaderPrefixSize);
+                  decoder->header_size_bytes, data.size_bytes + 
prefix_size_bytes);
     return ESPIPE;
   }
 
   // Run flatbuffers verification
   enum flatcc_verify_error_no verify_error =
-      ns(Message_verify_as_root(data.data.as_uint8, message_body_size);
+      ns(Message_verify_as_root(data.data.as_uint8,
+                                decoder->header_size_bytes - 
prefix_size_bytes);
          if (verify_error != flatcc_verify_ok)) {
     ArrowErrorSet(error, "Message flatbuffer verification failed (%d) %s",
                   (int)verify_error, flatcc_verify_error_string(verify_error));
@@ -1163,18 +1162,17 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct 
ArrowIpcDecoder* decoder,
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
   ArrowIpcDecoderResetHeaderInfo(decoder);
-  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
-      decoder, &data, &decoder->header_size_bytes, error));
+  int32_t prefix_size_bytes;
+  NANOARROW_RETURN_NOT_OK(
+      ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, 
error));
 
   // Check that data contains at least the entire header (return ESPIPE to 
signal
   // that reading more data may help).
-  int64_t message_body_size = decoder->header_size_bytes - 
kMessageHeaderPrefixSize;
-  if (data.size_bytes < message_body_size) {
+  if (data.size_bytes < decoder->header_size_bytes - prefix_size_bytes) {
     ArrowErrorSet(error,
-                  "Expected >= %" PRId64 " bytes of remaining data but found 
%" PRId64
+                  "Expected >= %d bytes of remaining data but found %" PRId64
                   " bytes in buffer",
-                  message_body_size + kMessageHeaderPrefixSize,
-                  data.size_bytes + kMessageHeaderPrefixSize);
+                  decoder->header_size_bytes, data.size_bytes + 
prefix_size_bytes);
     return ESPIPE;
   }
 
diff --git a/src/nanoarrow/ipc/decoder_test.cc 
b/src/nanoarrow/ipc/decoder_test.cc
index ffd8f7f2..f5e54dd2 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -129,28 +129,28 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
   EXPECT_STREQ(error.message,
                "Expected data of at least 8 bytes but only 1 bytes remain");
 
-  uint32_t eight_bad_bytes[] = {0, 0};
-  data.data.as_uint8 = reinterpret_cast<uint8_t*>(eight_bad_bytes);
-  data.size_bytes = 8;
+  uint32_t eight_bad_bytes[] = {negative_one_le * 256, 999};
+  data.data.as_uint32 = eight_bad_bytes;
+  data.size_bytes = sizeof(eight_bad_bytes);
   EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
   EXPECT_STREQ(error.message,
-               "Expected 0xFFFFFFFF at start of message but found 0x00000000");
+               "Expected 0xFFFFFFFF at start of message but found 0xFFFFFF00");
 
   ArrowErrorInit(&error);
   EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
   EXPECT_STREQ(error.message,
-               "Expected 0xFFFFFFFF at start of message but found 0x00000000");
+               "Expected 0xFFFFFFFF at start of message but found 0xFFFFFF00");
 
   eight_bad_bytes[0] = 0xFFFFFFFF;
   eight_bad_bytes[1] = negative_one_le;
   EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
   EXPECT_STREQ(error.message,
-               "Expected message body size > 0 but found message body size of 
-1 bytes");
+               "Expected message size > 0 but found message size of -1 bytes");
 
   ArrowErrorInit(&error);
   EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
   EXPECT_STREQ(error.message,
-               "Expected message body size > 0 but found message body size of 
-1 bytes");
+               "Expected message size > 0 but found message size of -1 bytes");
 
   eight_bad_bytes[1] = one_le;
   EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
@@ -169,6 +169,17 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
   EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ENODATA);
   EXPECT_STREQ(error.message, "End of Arrow stream");
 
+  uint32_t pre_continuation[] = {0, 0};
+  data.data.as_uint32 = pre_continuation;
+  data.size_bytes = sizeof(pre_continuation);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ENODATA);
+  EXPECT_STREQ(error.message, "End of Arrow stream");
+
+  pre_continuation[0] = one_le << 3;
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
+  EXPECT_STREQ(error.message,
+               "Expected >= 12 bytes of remaining data but found 8 bytes in 
buffer");
+
   ArrowIpcDecoderReset(&decoder);
 }
 

Reply via email to