This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 08d14b37 fix: Accommodate IPC messages without continuation bytes
(#629)
08d14b37 is described below
commit 08d14b378c123807b399e455d33693375f3d5f87
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Fri Sep 20 16:23:15 2024 -0500
fix: Accommodate IPC messages without continuation bytes (#629)
Before arrow 0.15, encapsulated messages started with just the length
instead of with a continuation token
---
src/nanoarrow/ipc/decoder.c | 88 +++++++++++++++++++--------------------
src/nanoarrow/ipc/decoder_test.cc | 25 +++++++----
2 files changed, 61 insertions(+), 52 deletions(-)
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index 5be3794e..dcde4335 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -52,10 +52,6 @@
#define ENODATA 120
#endif
-// A more readable expression way to refer to the fact that there are 8 bytes
-// at the beginning of every message header.
-static const int32_t kMessageHeaderPrefixSize = 8;
-
#define NANOARROW_IPC_MAGIC "ARROW1"
// Internal representation of a parsed "Field" from flatbuffers. This
@@ -272,14 +268,6 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder)
{
}
}
-static inline uint32_t ArrowIpcReadContinuationBytes(struct ArrowBufferView*
data) {
- uint32_t value;
- memcpy(&value, data->data.as_uint8, sizeof(uint32_t));
- data->data.as_uint8 += sizeof(uint32_t);
- data->size_bytes -= sizeof(uint32_t);
- return value;
-}
-
static inline int32_t ArrowIpcReadInt32LE(struct ArrowBufferView* data, int
swap_endian) {
int32_t value;
memcpy(&value, data->data.as_uint8, sizeof(int32_t));
@@ -984,40 +972,49 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct
ArrowIpcDecoder* decode
// Returns NANOARROW_OK if data is large enough to read the first 8 bytes
// of the message header, ESPIPE if reading more data might help, or EINVAL if
the content
-// is not valid. Advances the input ArrowBufferView by 8 bytes.
+// is not valid. Advances the input ArrowBufferView by prefix_size (8 bytes or
4 bytes if
+// the message is pre-0.15 and has no continuation). Sets
decoder->header_size_bytes
+// to the flatbuffers length plus the prefix_size.
static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder*
decoder,
struct ArrowBufferView*
data_mut,
- int32_t* message_size_bytes,
+ int32_t* prefix_size_bytes,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
- if (data_mut->size_bytes < kMessageHeaderPrefixSize) {
+ if (data_mut->size_bytes < 8) {
ArrowErrorSet(error,
"Expected data of at least 8 bytes but only %" PRId64 "
bytes remain",
data_mut->size_bytes);
return ESPIPE;
}
- uint32_t continuation = ArrowIpcReadContinuationBytes(data_mut);
- if (continuation != 0xFFFFFFFF) {
- ArrowErrorSet(error, "Expected 0xFFFFFFFF at start of message but found
0x%08X",
- (unsigned int)continuation);
- return EINVAL;
+ int swap_endian = private_data->system_endianness ==
NANOARROW_IPC_ENDIANNESS_BIG;
+ int32_t continuation = ArrowIpcReadInt32LE(data_mut, swap_endian);
+ int32_t length;
+ if ((uint32_t)continuation != 0xFFFFFFFF) {
+ if (continuation < 0) {
+ ArrowErrorSet(error, "Expected 0xFFFFFFFF at start of message but found
0x%08X",
+ (unsigned int)continuation);
+ return EINVAL;
+ }
+ // Tolerate pre-0.15 encapsulated messages which only had the length prefix
+ length = continuation;
+ *prefix_size_bytes = sizeof(length);
+ } else {
+ length = ArrowIpcReadInt32LE(data_mut, swap_endian);
+ *prefix_size_bytes = sizeof(continuation) + sizeof(length);
}
+ decoder->header_size_bytes = *prefix_size_bytes + length;
- int swap_endian = private_data->system_endianness ==
NANOARROW_IPC_ENDIANNESS_BIG;
- int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
- *message_size_bytes = header_body_size_bytes + kMessageHeaderPrefixSize;
- if (header_body_size_bytes < 0) {
+ if (length < 0) {
ArrowErrorSet(error,
- "Expected message body size > 0 but found message body size
of %" PRId32
- " bytes",
- header_body_size_bytes);
+ "Expected message size > 0 but found message size of %"
PRId32 " bytes",
+ length);
return EINVAL;
}
- if (header_body_size_bytes == 0) {
+ if (length == 0) {
ArrowErrorSet(error, "End of Arrow stream");
return ENODATA;
}
@@ -1029,8 +1026,10 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct
ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
ArrowIpcDecoderResetHeaderInfo(decoder);
- NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
- decoder, &data, &decoder->header_size_bytes, error));
+ int32_t prefix_size_bytes;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes,
error));
+ NANOARROW_UNUSED(prefix_size_bytes);
return NANOARROW_OK;
}
@@ -1041,24 +1040,24 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct
ArrowIpcDecoder* decoder,
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
ArrowIpcDecoderResetHeaderInfo(decoder);
- NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
- decoder, &data, &decoder->header_size_bytes, error));
+ int32_t prefix_size_bytes;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes,
error));
// Check that data contains at least the entire header (return ESPIPE to
signal
// that reading more data may help).
- int64_t message_body_size = decoder->header_size_bytes -
kMessageHeaderPrefixSize;
- if (data.size_bytes < message_body_size) {
+ if (data.size_bytes < decoder->header_size_bytes - prefix_size_bytes) {
ArrowErrorSet(error,
- "Expected >= %" PRId64 " bytes of remaining data but found
%" PRId64
+ "Expected >= %d bytes of remaining data but found %" PRId64
" bytes in buffer",
- message_body_size + kMessageHeaderPrefixSize,
- data.size_bytes + kMessageHeaderPrefixSize);
+ decoder->header_size_bytes, data.size_bytes +
prefix_size_bytes);
return ESPIPE;
}
// Run flatbuffers verification
enum flatcc_verify_error_no verify_error =
- ns(Message_verify_as_root(data.data.as_uint8, message_body_size);
+ ns(Message_verify_as_root(data.data.as_uint8,
+ decoder->header_size_bytes -
prefix_size_bytes);
if (verify_error != flatcc_verify_ok)) {
ArrowErrorSet(error, "Message flatbuffer verification failed (%d) %s",
(int)verify_error, flatcc_verify_error_string(verify_error));
@@ -1163,18 +1162,17 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct
ArrowIpcDecoder* decoder,
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
ArrowIpcDecoderResetHeaderInfo(decoder);
- NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
- decoder, &data, &decoder->header_size_bytes, error));
+ int32_t prefix_size_bytes;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes,
error));
// Check that data contains at least the entire header (return ESPIPE to
signal
// that reading more data may help).
- int64_t message_body_size = decoder->header_size_bytes -
kMessageHeaderPrefixSize;
- if (data.size_bytes < message_body_size) {
+ if (data.size_bytes < decoder->header_size_bytes - prefix_size_bytes) {
ArrowErrorSet(error,
- "Expected >= %" PRId64 " bytes of remaining data but found
%" PRId64
+ "Expected >= %d bytes of remaining data but found %" PRId64
" bytes in buffer",
- message_body_size + kMessageHeaderPrefixSize,
- data.size_bytes + kMessageHeaderPrefixSize);
+ decoder->header_size_bytes, data.size_bytes +
prefix_size_bytes);
return ESPIPE;
}
diff --git a/src/nanoarrow/ipc/decoder_test.cc
b/src/nanoarrow/ipc/decoder_test.cc
index ffd8f7f2..f5e54dd2 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -129,28 +129,28 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
EXPECT_STREQ(error.message,
"Expected data of at least 8 bytes but only 1 bytes remain");
- uint32_t eight_bad_bytes[] = {0, 0};
- data.data.as_uint8 = reinterpret_cast<uint8_t*>(eight_bad_bytes);
- data.size_bytes = 8;
+ uint32_t eight_bad_bytes[] = {negative_one_le * 256, 999};
+ data.data.as_uint32 = eight_bad_bytes;
+ data.size_bytes = sizeof(eight_bad_bytes);
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
- "Expected 0xFFFFFFFF at start of message but found 0x00000000");
+ "Expected 0xFFFFFFFF at start of message but found 0xFFFFFF00");
ArrowErrorInit(&error);
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
- "Expected 0xFFFFFFFF at start of message but found 0x00000000");
+ "Expected 0xFFFFFFFF at start of message but found 0xFFFFFF00");
eight_bad_bytes[0] = 0xFFFFFFFF;
eight_bad_bytes[1] = negative_one_le;
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
- "Expected message body size > 0 but found message body size of
-1 bytes");
+ "Expected message size > 0 but found message size of -1 bytes");
ArrowErrorInit(&error);
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
- "Expected message body size > 0 but found message body size of
-1 bytes");
+ "Expected message size > 0 but found message size of -1 bytes");
eight_bad_bytes[1] = one_le;
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
@@ -169,6 +169,17 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ENODATA);
EXPECT_STREQ(error.message, "End of Arrow stream");
+ uint32_t pre_continuation[] = {0, 0};
+ data.data.as_uint32 = pre_continuation;
+ data.size_bytes = sizeof(pre_continuation);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ENODATA);
+ EXPECT_STREQ(error.message, "End of Arrow stream");
+
+ pre_continuation[0] = one_le << 3;
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
+ EXPECT_STREQ(error.message,
+ "Expected >= 12 bytes of remaining data but found 8 bytes in
buffer");
+
ArrowIpcDecoderReset(&decoder);
}