This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 982e4750 feat: add ArrowIpcOutputStream (#570)
982e4750 is described below
commit 982e47500aafa873034ee3728d0381c6214ebe19
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Wed Jul 31 11:19:26 2024 -0500
feat: add ArrowIpcOutputStream (#570)
This is a prerequisite for adding nanoarrow to the IPC integration tests
(since we'll need a full stream writer into which we'll push batches,
which will need an output stream into which it'll push bytes).
---
CMakeLists.txt | 3 +-
ci/scripts/bundle.py | 1 +
meson.build | 2 +
src/nanoarrow/ipc/encoder.c | 14 ++--
src/nanoarrow/ipc/ipc_hpp_test.cc | 26 +++++++
src/nanoarrow/ipc/writer.c | 152 ++++++++++++++++++++++++++++++++++++++
src/nanoarrow/ipc/writer_test.cc | 90 ++++++++++++++++++++++
src/nanoarrow/nanoarrow_ipc.h | 47 ++++++++++++
src/nanoarrow/nanoarrow_ipc.hpp | 22 ++++++
9 files changed, 350 insertions(+), 7 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2cb8ad90..e4d51ffa 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -225,7 +225,7 @@ if(NANOARROW_IPC)
if(NOT NANOARROW_BUNDLE)
set(NANOARROW_IPC_BUILD_SOURCES
src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
- src/nanoarrow/ipc/reader.c)
+ src/nanoarrow/ipc/reader.c src/nanoarrow/ipc/writer.c)
endif()
add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
@@ -431,6 +431,7 @@ if(NANOARROW_BUILD_TESTS)
decoder
encoder
reader
+ writer
files
ipc_hpp)
add_executable(nanoarrow_ipc_${name}_test
src/nanoarrow/ipc/${name}_test.cc)
diff --git a/ci/scripts/bundle.py b/ci/scripts/bundle.py
index 7ee09437..3ce2f3c6 100644
--- a/ci/scripts/bundle.py
+++ b/ci/scripts/bundle.py
@@ -205,6 +205,7 @@ def bundle_nanoarrow_ipc(
src_dir / "ipc" / "decoder.c",
src_dir / "ipc" / "encoder.c",
src_dir / "ipc" / "reader.c",
+ src_dir / "ipc" / "writer.c",
]
)
nanoarrow_ipc_c = nanoarrow_ipc_c.replace(
diff --git a/meson.build b/meson.build
index ccab27d3..f402e511 100644
--- a/meson.build
+++ b/meson.build
@@ -82,7 +82,9 @@ if get_option('ipc')
nanoarrow_ipc_lib = library(
'nanoarrow_ipc',
'src/nanoarrow/ipc/decoder.c',
+ 'src/nanoarrow/ipc/encoder.c',
'src/nanoarrow/ipc/reader.c',
+ 'src/nanoarrow/ipc/writer.c',
dependencies: [nanoarrow_dep, flatcc_dep],
install: true,
)
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
index 6813f61f..3108c92e 100644
--- a/src/nanoarrow/ipc/encoder.c
+++ b/src/nanoarrow/ipc/encoder.c
@@ -62,14 +62,16 @@ ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder*
encoder) {
}
void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
- NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL);
+ NANOARROW_DCHECK(encoder != NULL);
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
- flatcc_builder_clear(&private->builder);
- ArrowBufferReset(&private->nodes);
- ArrowBufferReset(&private->buffers);
- ArrowFree(private);
- memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
+ if (private != NULL) {
+ flatcc_builder_clear(&private->builder);
+ ArrowBufferReset(&private->nodes);
+ ArrowBufferReset(&private->buffers);
+ ArrowFree(private);
+ memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
+ }
}
ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
diff --git a/src/nanoarrow/ipc/ipc_hpp_test.cc
b/src/nanoarrow/ipc/ipc_hpp_test.cc
index ec3af84b..ab5dcb68 100644
--- a/src/nanoarrow/ipc/ipc_hpp_test.cc
+++ b/src/nanoarrow/ipc/ipc_hpp_test.cc
@@ -31,6 +31,18 @@ TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueDecoder) {
EXPECT_EQ(decoder->private_data, nullptr);
}
+TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueEncoder) {
+ nanoarrow::ipc::UniqueEncoder encoder;
+
+ EXPECT_EQ(encoder->private_data, nullptr);
+ ASSERT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
+ EXPECT_NE(encoder->private_data, nullptr);
+
+ nanoarrow::ipc::UniqueEncoder encoder2 = std::move(encoder);
+ EXPECT_NE(encoder2->private_data, nullptr);
+ EXPECT_EQ(encoder->private_data, nullptr);
+}
+
TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueInputStream) {
nanoarrow::ipc::UniqueInputStream input;
nanoarrow::UniqueBuffer buf;
@@ -44,3 +56,17 @@ TEST(NanoarrowIpcHppTest,
NanoarrowIpcHppTestUniqueInputStream) {
EXPECT_NE(input2->release, nullptr);
EXPECT_EQ(input->release, nullptr);
}
+
+TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueOutputStream) {
+ nanoarrow::ipc::UniqueOutputStream output;
+ nanoarrow::UniqueBuffer buf;
+ ASSERT_EQ(ArrowBufferAppend(buf.get(), "abcdefg", 7), NANOARROW_OK);
+
+ EXPECT_EQ(output->release, nullptr);
+ ASSERT_EQ(ArrowIpcOutputStreamInitBuffer(output.get(), buf.get()),
NANOARROW_OK);
+ EXPECT_NE(output->release, nullptr);
+
+ nanoarrow::ipc::UniqueOutputStream output2 = std::move(output);
+ EXPECT_NE(output2->release, nullptr);
+ EXPECT_EQ(output->release, nullptr);
+}
diff --git a/src/nanoarrow/ipc/writer.c b/src/nanoarrow/ipc/writer.c
new file mode 100644
index 00000000..3227d20a
--- /dev/null
+++ b/src/nanoarrow/ipc/writer.c
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "nanoarrow/nanoarrow.h"
+#include "nanoarrow/nanoarrow_ipc.h"
+
+void ArrowIpcOutputStreamMove(struct ArrowIpcOutputStream* src,
+ struct ArrowIpcOutputStream* dst) {
+ memcpy(dst, src, sizeof(struct ArrowIpcOutputStream));
+ src->release = NULL;
+}
+
+struct ArrowIpcOutputStreamBufferPrivate {
+ struct ArrowBuffer* output;
+};
+
+static ArrowErrorCode ArrowIpcOutputStreamBufferWrite(struct
ArrowIpcOutputStream* stream,
+ const void* buf,
+ int64_t buf_size_bytes,
+ int64_t*
size_written_out,
+ struct ArrowError*
error) {
+ struct ArrowIpcOutputStreamBufferPrivate* private_data =
+ (struct ArrowIpcOutputStreamBufferPrivate*)stream->private_data;
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowBufferAppend(private_data->output, buf, buf_size_bytes), error);
+ *size_written_out = buf_size_bytes;
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcOutputStreamBufferRelease(struct ArrowIpcOutputStream*
stream) {
+ struct ArrowIpcOutputStreamBufferPrivate* private_data =
+ (struct ArrowIpcOutputStreamBufferPrivate*)stream->private_data;
+ ArrowFree(private_data);
+ stream->release = NULL;
+}
+
+ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream*
stream,
+ struct ArrowBuffer* output) {
+ struct ArrowIpcOutputStreamBufferPrivate* private_data =
+ (struct ArrowIpcOutputStreamBufferPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcOutputStreamBufferPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ private_data->output = output;
+ stream->write = &ArrowIpcOutputStreamBufferWrite;
+ stream->release = &ArrowIpcOutputStreamBufferRelease;
+ stream->private_data = private_data;
+
+ return NANOARROW_OK;
+}
+
+struct ArrowIpcOutputStreamFilePrivate {
+ FILE* file_ptr;
+ int stream_finished;
+ int close_on_release;
+};
+
+static void ArrowIpcOutputStreamFileRelease(struct ArrowIpcOutputStream*
stream) {
+ struct ArrowIpcOutputStreamFilePrivate* private_data =
+ (struct ArrowIpcOutputStreamFilePrivate*)stream->private_data;
+
+ if (private_data->file_ptr != NULL && private_data->close_on_release) {
+ fclose(private_data->file_ptr);
+ }
+
+ ArrowFree(private_data);
+ stream->release = NULL;
+}
+
+static ArrowErrorCode ArrowIpcOutputStreamFileWrite(struct
ArrowIpcOutputStream* stream,
+ const void* buf,
+ int64_t buf_size_bytes,
+ int64_t* size_written_out,
+ struct ArrowError* error) {
+ struct ArrowIpcOutputStreamFilePrivate* private_data =
+ (struct ArrowIpcOutputStreamFilePrivate*)stream->private_data;
+
+ if (private_data->stream_finished) {
+ *size_written_out = 0;
+ return NANOARROW_OK;
+ }
+
+ // Do the write
+ int64_t bytes_written = (int64_t)fwrite(buf, 1, buf_size_bytes,
private_data->file_ptr);
+ *size_written_out = bytes_written;
+
+ if (bytes_written != buf_size_bytes) {
+ private_data->stream_finished = 1;
+
+ // Inspect error
+ int has_error = !feof(private_data->file_ptr) &&
ferror(private_data->file_ptr);
+
+ // Try to close the file now
+ if (private_data->close_on_release) {
+ if (fclose(private_data->file_ptr) == 0) {
+ private_data->file_ptr = NULL;
+ }
+ }
+
+ // Maybe return error
+ if (has_error) {
+ ArrowErrorSet(error, "ArrowIpcOutputStreamFile IO error");
+ return EIO;
+ }
+ }
+
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream*
stream,
+ void* file_ptr, int
close_on_release) {
+ if (file_ptr == NULL) {
+ return EINVAL;
+ }
+
+ struct ArrowIpcOutputStreamFilePrivate* private_data =
+ (struct ArrowIpcOutputStreamFilePrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcOutputStreamFilePrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ private_data->file_ptr = (FILE*)file_ptr;
+ private_data->close_on_release = close_on_release;
+ private_data->stream_finished = 0;
+
+ stream->write = &ArrowIpcOutputStreamFileWrite;
+ stream->release = &ArrowIpcOutputStreamFileRelease;
+ stream->private_data = private_data;
+ return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/ipc/writer_test.cc b/src/nanoarrow/ipc/writer_test.cc
new file mode 100644
index 00000000..d0c344f8
--- /dev/null
+++ b/src/nanoarrow/ipc/writer_test.cc
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <stdio.h>
+
+#include "nanoarrow/nanoarrow_ipc.hpp"
+
+using nanoarrow::literals::operator""_asv;
+
+TEST(NanoarrowIpcWriter, OutputStreamBuffer) {
+ struct ArrowError error;
+
+ // The output buffer starts with some header
+ std::string header = "HELLO WORLD";
+ nanoarrow::UniqueBuffer output;
+ ASSERT_EQ(ArrowBufferAppend(output.get(), header.data(), header.size()),
NANOARROW_OK);
+
+ // Then the stream starts appending to it
+ nanoarrow::ipc::UniqueOutputStream stream;
+ ASSERT_EQ(ArrowIpcOutputStreamInitBuffer(stream.get(), output.get()),
NANOARROW_OK);
+
+ std::string message = "\n-_-_";
+ for (int i = 0; i < 4; ++i) {
+ int64_t actually_written;
+ ASSERT_EQ(stream->write(stream.get(), message.data(), message.size(),
+ &actually_written, &error),
+ NANOARROW_OK)
+ << error.message;
+ EXPECT_EQ(actually_written, message.size());
+ }
+
+ EXPECT_EQ(output->size_bytes, header.size() + 4 * message.size());
+
+ std::string output_str(output->size_bytes, '\0');
+ memcpy(output_str.data(), output->data, output->size_bytes);
+ EXPECT_EQ(output_str, header + message + message + message + message);
+}
+
+TEST(NanoarrowIpcWriter, OutputStreamFile) {
+ FILE* file_ptr = tmpfile();
+ ASSERT_NE(file_ptr, nullptr);
+
+ // Start by writing some header
+ std::string header = "HELLO WORLD";
+ ASSERT_EQ(fwrite(header.data(), 1, header.size(), file_ptr), header.size());
+
+ // Then seek to test that we overwrite WORLD but not HELLO
+ fseek(file_ptr, 6, SEEK_SET);
+
+ nanoarrow::ipc::UniqueOutputStream stream;
+ ASSERT_EQ(ArrowIpcOutputStreamInitFile(stream.get(), file_ptr, 1),
NANOARROW_OK);
+
+ struct ArrowError error;
+
+ // Start appending using the stream
+ std::string message = "\n-_-_";
+ for (int i = 0; i < 4; ++i) {
+ int64_t actually_written;
+ ASSERT_EQ(stream->write(stream.get(), message.data(), message.size(),
+ &actually_written, &error),
+ NANOARROW_OK)
+ << error.message;
+ EXPECT_EQ(actually_written, message.size());
+ }
+
+ // Read back the whole file
+ fseek(file_ptr, 0, SEEK_END);
+ std::string buffer(static_cast<size_t>(ftell(file_ptr)), '\0');
+ rewind(file_ptr);
+ ASSERT_EQ(fread(buffer.data(), 1, buffer.size(), file_ptr), buffer.size());
+
+ EXPECT_EQ(buffer.size(), 6 + 4 * message.size());
+ EXPECT_EQ(buffer, "HELLO " + message + message + message + message);
+}
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index c74e288a..2695ada9 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -63,6 +63,12 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)
#define ArrowIpcEncoderEncodeSchema \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderEncodeSchema)
+#define ArrowIpcOutputStreamInitBuffer \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitBuffer)
+#define ArrowIpcOutputStreamInitFile \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitFile)
+#define ArrowIpcOutputStreamMove \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamMove)
#endif
@@ -357,6 +363,8 @@ void ArrowIpcInputStreamMove(struct ArrowIpcInputStream*
src,
struct ArrowIpcInputStream* dst);
/// \brief Create an input stream from an ArrowBuffer
+///
+/// The stream takes ownership of the buffer and reads bytes from it.
ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream*
stream,
struct ArrowBuffer* input);
@@ -454,6 +462,45 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct
ArrowIpcEncoder* encoder,
const struct ArrowSchema* schema,
struct ArrowError* error);
+/// \brief An user-extensible output data sink
+struct ArrowIpcOutputStream {
+ /// \brief Write up to buf_size_bytes from stream into buf
+ ///
+ /// The actual number of bytes written is placed in the value pointed to by
+ /// size_read_out. Returns NANOARROW_OK on success.
+ ArrowErrorCode (*write)(struct ArrowIpcOutputStream* stream, const void* buf,
+ int64_t buf_size_bytes, int64_t* size_written_out,
+ struct ArrowError* error);
+
+ /// \brief Release the stream and any resources it may be holding
+ ///
+ /// Release callback implementations must set the release member to NULL.
+ /// Callers must check that the release callback is not NULL before calling
+ /// read() or release().
+ void (*release)(struct ArrowIpcOutputStream* stream);
+
+ /// \brief Private implementation-defined data
+ void* private_data;
+};
+
+/// \brief Transfer ownership of an ArrowIpcOutputStream
+void ArrowIpcOutputStreamMove(struct ArrowIpcOutputStream* src,
+ struct ArrowIpcOutputStream* dst);
+
+/// \brief Create an output stream from an ArrowBuffer
+///
+/// All bytes witten to the stream will be appended to the buffer.
+/// The stream does not take ownership of the buffer.
+ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream*
stream,
+ struct ArrowBuffer* output);
+
+/// \brief Create an output stream from a C FILE* pointer
+///
+/// Note that the ArrowIpcOutputStream has no mechanism to communicate an error
+/// if file_ptr fails to close. If this behaviour is needed, pass false to
+/// close_on_release and handle closing the file independently from stream.
+ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream*
stream,
+ void* file_ptr, int
close_on_release);
/// @}
#ifdef __cplusplus
diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp
index e5e0ee33..8000ca05 100644
--- a/src/nanoarrow/nanoarrow_ipc.hpp
+++ b/src/nanoarrow/nanoarrow_ipc.hpp
@@ -76,6 +76,25 @@ inline void release_pointer(struct ArrowIpcInputStream*
data) {
}
}
+template <>
+inline void init_pointer(struct ArrowIpcOutputStream* data) {
+ data->release = nullptr;
+}
+
+template <>
+inline void move_pointer(struct ArrowIpcOutputStream* src,
+ struct ArrowIpcOutputStream* dst) {
+ memcpy(dst, src, sizeof(struct ArrowIpcOutputStream));
+ src->release = nullptr;
+}
+
+template <>
+inline void release_pointer(struct ArrowIpcOutputStream* data) {
+ if (data->release != nullptr) {
+ data->release(data);
+ }
+}
+
} // namespace internal
} // namespace nanoarrow
@@ -99,6 +118,9 @@ using UniqueEncoder = internal::Unique<struct
ArrowIpcEncoder>;
/// \brief Class wrapping a unique struct ArrowIpcInputStream
using UniqueInputStream = internal::Unique<struct ArrowIpcInputStream>;
+/// \brief Class wrapping a unique struct ArrowIpcOutputStream
+using UniqueOutputStream = internal::Unique<struct ArrowIpcOutputStream>;
+
/// @}
} // namespace ipc