This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 982e4750 feat: add ArrowIpcOutputStream (#570)
982e4750 is described below

commit 982e47500aafa873034ee3728d0381c6214ebe19
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Wed Jul 31 11:19:26 2024 -0500

    feat: add ArrowIpcOutputStream (#570)
    
    This is a prerequisite for adding nanoarrow to the IPC integration tests
    (since we'll need a full stream writer into which we'll push batches,
    which will need an output stream into which it'll push bytes).
---
 CMakeLists.txt                    |   3 +-
 ci/scripts/bundle.py              |   1 +
 meson.build                       |   2 +
 src/nanoarrow/ipc/encoder.c       |  14 ++--
 src/nanoarrow/ipc/ipc_hpp_test.cc |  26 +++++++
 src/nanoarrow/ipc/writer.c        | 152 ++++++++++++++++++++++++++++++++++++++
 src/nanoarrow/ipc/writer_test.cc  |  90 ++++++++++++++++++++++
 src/nanoarrow/nanoarrow_ipc.h     |  47 ++++++++++++
 src/nanoarrow/nanoarrow_ipc.hpp   |  22 ++++++
 9 files changed, 350 insertions(+), 7 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2cb8ad90..e4d51ffa 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -225,7 +225,7 @@ if(NANOARROW_IPC)
   if(NOT NANOARROW_BUNDLE)
     set(NANOARROW_IPC_BUILD_SOURCES
         src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
-        src/nanoarrow/ipc/reader.c)
+        src/nanoarrow/ipc/reader.c src/nanoarrow/ipc/writer.c)
   endif()
 
   add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
@@ -431,6 +431,7 @@ if(NANOARROW_BUILD_TESTS)
             decoder
             encoder
             reader
+            writer
             files
             ipc_hpp)
       add_executable(nanoarrow_ipc_${name}_test 
src/nanoarrow/ipc/${name}_test.cc)
diff --git a/ci/scripts/bundle.py b/ci/scripts/bundle.py
index 7ee09437..3ce2f3c6 100644
--- a/ci/scripts/bundle.py
+++ b/ci/scripts/bundle.py
@@ -205,6 +205,7 @@ def bundle_nanoarrow_ipc(
             src_dir / "ipc" / "decoder.c",
             src_dir / "ipc" / "encoder.c",
             src_dir / "ipc" / "reader.c",
+            src_dir / "ipc" / "writer.c",
         ]
     )
     nanoarrow_ipc_c = nanoarrow_ipc_c.replace(
diff --git a/meson.build b/meson.build
index ccab27d3..f402e511 100644
--- a/meson.build
+++ b/meson.build
@@ -82,7 +82,9 @@ if get_option('ipc')
     nanoarrow_ipc_lib = library(
         'nanoarrow_ipc',
         'src/nanoarrow/ipc/decoder.c',
+        'src/nanoarrow/ipc/encoder.c',
         'src/nanoarrow/ipc/reader.c',
+        'src/nanoarrow/ipc/writer.c',
         dependencies: [nanoarrow_dep, flatcc_dep],
         install: true,
     )
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
index 6813f61f..3108c92e 100644
--- a/src/nanoarrow/ipc/encoder.c
+++ b/src/nanoarrow/ipc/encoder.c
@@ -62,14 +62,16 @@ ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* 
encoder) {
 }
 
 void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
-  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL);
+  NANOARROW_DCHECK(encoder != NULL);
   struct ArrowIpcEncoderPrivate* private =
       (struct ArrowIpcEncoderPrivate*)encoder->private_data;
-  flatcc_builder_clear(&private->builder);
-  ArrowBufferReset(&private->nodes);
-  ArrowBufferReset(&private->buffers);
-  ArrowFree(private);
-  memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
+  if (private != NULL) {
+    flatcc_builder_clear(&private->builder);
+    ArrowBufferReset(&private->nodes);
+    ArrowBufferReset(&private->buffers);
+    ArrowFree(private);
+    memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
+  }
 }
 
 ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
diff --git a/src/nanoarrow/ipc/ipc_hpp_test.cc 
b/src/nanoarrow/ipc/ipc_hpp_test.cc
index ec3af84b..ab5dcb68 100644
--- a/src/nanoarrow/ipc/ipc_hpp_test.cc
+++ b/src/nanoarrow/ipc/ipc_hpp_test.cc
@@ -31,6 +31,18 @@ TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueDecoder) {
   EXPECT_EQ(decoder->private_data, nullptr);
 }
 
+TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueEncoder) {
+  nanoarrow::ipc::UniqueEncoder encoder;
+
+  EXPECT_EQ(encoder->private_data, nullptr);
+  ASSERT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
+  EXPECT_NE(encoder->private_data, nullptr);
+
+  nanoarrow::ipc::UniqueEncoder encoder2 = std::move(encoder);
+  EXPECT_NE(encoder2->private_data, nullptr);
+  EXPECT_EQ(encoder->private_data, nullptr);
+}
+
 TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueInputStream) {
   nanoarrow::ipc::UniqueInputStream input;
   nanoarrow::UniqueBuffer buf;
@@ -44,3 +56,17 @@ TEST(NanoarrowIpcHppTest, 
NanoarrowIpcHppTestUniqueInputStream) {
   EXPECT_NE(input2->release, nullptr);
   EXPECT_EQ(input->release, nullptr);
 }
+
+TEST(NanoarrowIpcHppTest, NanoarrowIpcHppTestUniqueOutputStream) {
+  nanoarrow::ipc::UniqueOutputStream output;
+  nanoarrow::UniqueBuffer buf;
+  ASSERT_EQ(ArrowBufferAppend(buf.get(), "abcdefg", 7), NANOARROW_OK);
+
+  EXPECT_EQ(output->release, nullptr);
+  ASSERT_EQ(ArrowIpcOutputStreamInitBuffer(output.get(), buf.get()), 
NANOARROW_OK);
+  EXPECT_NE(output->release, nullptr);
+
+  nanoarrow::ipc::UniqueOutputStream output2 = std::move(output);
+  EXPECT_NE(output2->release, nullptr);
+  EXPECT_EQ(output->release, nullptr);
+}
diff --git a/src/nanoarrow/ipc/writer.c b/src/nanoarrow/ipc/writer.c
new file mode 100644
index 00000000..3227d20a
--- /dev/null
+++ b/src/nanoarrow/ipc/writer.c
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "nanoarrow/nanoarrow.h"
+#include "nanoarrow/nanoarrow_ipc.h"
+
+void ArrowIpcOutputStreamMove(struct ArrowIpcOutputStream* src,
+                              struct ArrowIpcOutputStream* dst) {
+  memcpy(dst, src, sizeof(struct ArrowIpcOutputStream));
+  src->release = NULL;
+}
+
+struct ArrowIpcOutputStreamBufferPrivate {
+  struct ArrowBuffer* output;
+};
+
+static ArrowErrorCode ArrowIpcOutputStreamBufferWrite(struct 
ArrowIpcOutputStream* stream,
+                                                      const void* buf,
+                                                      int64_t buf_size_bytes,
+                                                      int64_t* 
size_written_out,
+                                                      struct ArrowError* 
error) {
+  struct ArrowIpcOutputStreamBufferPrivate* private_data =
+      (struct ArrowIpcOutputStreamBufferPrivate*)stream->private_data;
+  NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+      ArrowBufferAppend(private_data->output, buf, buf_size_bytes), error);
+  *size_written_out = buf_size_bytes;
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcOutputStreamBufferRelease(struct ArrowIpcOutputStream* 
stream) {
+  struct ArrowIpcOutputStreamBufferPrivate* private_data =
+      (struct ArrowIpcOutputStreamBufferPrivate*)stream->private_data;
+  ArrowFree(private_data);
+  stream->release = NULL;
+}
+
+ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream* 
stream,
+                                              struct ArrowBuffer* output) {
+  struct ArrowIpcOutputStreamBufferPrivate* private_data =
+      (struct ArrowIpcOutputStreamBufferPrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcOutputStreamBufferPrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  private_data->output = output;
+  stream->write = &ArrowIpcOutputStreamBufferWrite;
+  stream->release = &ArrowIpcOutputStreamBufferRelease;
+  stream->private_data = private_data;
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcOutputStreamFilePrivate {
+  FILE* file_ptr;
+  int stream_finished;
+  int close_on_release;
+};
+
+static void ArrowIpcOutputStreamFileRelease(struct ArrowIpcOutputStream* 
stream) {
+  struct ArrowIpcOutputStreamFilePrivate* private_data =
+      (struct ArrowIpcOutputStreamFilePrivate*)stream->private_data;
+
+  if (private_data->file_ptr != NULL && private_data->close_on_release) {
+    fclose(private_data->file_ptr);
+  }
+
+  ArrowFree(private_data);
+  stream->release = NULL;
+}
+
+static ArrowErrorCode ArrowIpcOutputStreamFileWrite(struct 
ArrowIpcOutputStream* stream,
+                                                    const void* buf,
+                                                    int64_t buf_size_bytes,
+                                                    int64_t* size_written_out,
+                                                    struct ArrowError* error) {
+  struct ArrowIpcOutputStreamFilePrivate* private_data =
+      (struct ArrowIpcOutputStreamFilePrivate*)stream->private_data;
+
+  if (private_data->stream_finished) {
+    *size_written_out = 0;
+    return NANOARROW_OK;
+  }
+
+  // Do the write
+  int64_t bytes_written = (int64_t)fwrite(buf, 1, buf_size_bytes, 
private_data->file_ptr);
+  *size_written_out = bytes_written;
+
+  if (bytes_written != buf_size_bytes) {
+    private_data->stream_finished = 1;
+
+    // Inspect error
+    int has_error = !feof(private_data->file_ptr) && 
ferror(private_data->file_ptr);
+
+    // Try to close the file now
+    if (private_data->close_on_release) {
+      if (fclose(private_data->file_ptr) == 0) {
+        private_data->file_ptr = NULL;
+      }
+    }
+
+    // Maybe return error
+    if (has_error) {
+      ArrowErrorSet(error, "ArrowIpcOutputStreamFile IO error");
+      return EIO;
+    }
+  }
+
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* 
stream,
+                                            void* file_ptr, int 
close_on_release) {
+  if (file_ptr == NULL) {
+    return EINVAL;
+  }
+
+  struct ArrowIpcOutputStreamFilePrivate* private_data =
+      (struct ArrowIpcOutputStreamFilePrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcOutputStreamFilePrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  private_data->file_ptr = (FILE*)file_ptr;
+  private_data->close_on_release = close_on_release;
+  private_data->stream_finished = 0;
+
+  stream->write = &ArrowIpcOutputStreamFileWrite;
+  stream->release = &ArrowIpcOutputStreamFileRelease;
+  stream->private_data = private_data;
+  return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/ipc/writer_test.cc b/src/nanoarrow/ipc/writer_test.cc
new file mode 100644
index 00000000..d0c344f8
--- /dev/null
+++ b/src/nanoarrow/ipc/writer_test.cc
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <stdio.h>
+
+#include "nanoarrow/nanoarrow_ipc.hpp"
+
+using nanoarrow::literals::operator""_asv;
+
+TEST(NanoarrowIpcWriter, OutputStreamBuffer) {
+  struct ArrowError error;
+
+  // The output buffer starts with some header
+  std::string header = "HELLO WORLD";
+  nanoarrow::UniqueBuffer output;
+  ASSERT_EQ(ArrowBufferAppend(output.get(), header.data(), header.size()), 
NANOARROW_OK);
+
+  // Then the stream starts appending to it
+  nanoarrow::ipc::UniqueOutputStream stream;
+  ASSERT_EQ(ArrowIpcOutputStreamInitBuffer(stream.get(), output.get()), 
NANOARROW_OK);
+
+  std::string message = "\n-_-_";
+  for (int i = 0; i < 4; ++i) {
+    int64_t actually_written;
+    ASSERT_EQ(stream->write(stream.get(), message.data(), message.size(),
+                            &actually_written, &error),
+              NANOARROW_OK)
+        << error.message;
+    EXPECT_EQ(actually_written, message.size());
+  }
+
+  EXPECT_EQ(output->size_bytes, header.size() + 4 * message.size());
+
+  std::string output_str(output->size_bytes, '\0');
+  memcpy(output_str.data(), output->data, output->size_bytes);
+  EXPECT_EQ(output_str, header + message + message + message + message);
+}
+
+TEST(NanoarrowIpcWriter, OutputStreamFile) {
+  FILE* file_ptr = tmpfile();
+  ASSERT_NE(file_ptr, nullptr);
+
+  // Start by writing some header
+  std::string header = "HELLO WORLD";
+  ASSERT_EQ(fwrite(header.data(), 1, header.size(), file_ptr), header.size());
+
+  // Then seek to test that we overwrite WORLD but not HELLO
+  fseek(file_ptr, 6, SEEK_SET);
+
+  nanoarrow::ipc::UniqueOutputStream stream;
+  ASSERT_EQ(ArrowIpcOutputStreamInitFile(stream.get(), file_ptr, 1), 
NANOARROW_OK);
+
+  struct ArrowError error;
+
+  // Start appending using the stream
+  std::string message = "\n-_-_";
+  for (int i = 0; i < 4; ++i) {
+    int64_t actually_written;
+    ASSERT_EQ(stream->write(stream.get(), message.data(), message.size(),
+                            &actually_written, &error),
+              NANOARROW_OK)
+        << error.message;
+    EXPECT_EQ(actually_written, message.size());
+  }
+
+  // Read back the whole file
+  fseek(file_ptr, 0, SEEK_END);
+  std::string buffer(static_cast<size_t>(ftell(file_ptr)), '\0');
+  rewind(file_ptr);
+  ASSERT_EQ(fread(buffer.data(), 1, buffer.size(), file_ptr), buffer.size());
+
+  EXPECT_EQ(buffer.size(), 6 + 4 * message.size());
+  EXPECT_EQ(buffer, "HELLO " + message + message + message + message);
+}
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index c74e288a..2695ada9 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -63,6 +63,12 @@
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)
 #define ArrowIpcEncoderEncodeSchema \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderEncodeSchema)
+#define ArrowIpcOutputStreamInitBuffer \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitBuffer)
+#define ArrowIpcOutputStreamInitFile \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitFile)
+#define ArrowIpcOutputStreamMove \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamMove)
 
 #endif
 
@@ -357,6 +363,8 @@ void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* 
src,
                              struct ArrowIpcInputStream* dst);
 
 /// \brief Create an input stream from an ArrowBuffer
+///
+/// The stream takes ownership of the buffer and reads bytes from it.
 ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* 
stream,
                                              struct ArrowBuffer* input);
 
@@ -454,6 +462,45 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct 
ArrowIpcEncoder* encoder,
                                            const struct ArrowSchema* schema,
                                            struct ArrowError* error);
 
+/// \brief An user-extensible output data sink
+struct ArrowIpcOutputStream {
+  /// \brief Write up to buf_size_bytes from stream into buf
+  ///
+  /// The actual number of bytes written is placed in the value pointed to by
+  /// size_read_out. Returns NANOARROW_OK on success.
+  ArrowErrorCode (*write)(struct ArrowIpcOutputStream* stream, const void* buf,
+                          int64_t buf_size_bytes, int64_t* size_written_out,
+                          struct ArrowError* error);
+
+  /// \brief Release the stream and any resources it may be holding
+  ///
+  /// Release callback implementations must set the release member to NULL.
+  /// Callers must check that the release callback is not NULL before calling
+  /// read() or release().
+  void (*release)(struct ArrowIpcOutputStream* stream);
+
+  /// \brief Private implementation-defined data
+  void* private_data;
+};
+
+/// \brief Transfer ownership of an ArrowIpcOutputStream
+void ArrowIpcOutputStreamMove(struct ArrowIpcOutputStream* src,
+                              struct ArrowIpcOutputStream* dst);
+
+/// \brief Create an output stream from an ArrowBuffer
+///
+/// All bytes witten to the stream will be appended to the buffer.
+/// The stream does not take ownership of the buffer.
+ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream* 
stream,
+                                              struct ArrowBuffer* output);
+
+/// \brief Create an output stream from a C FILE* pointer
+///
+/// Note that the ArrowIpcOutputStream has no mechanism to communicate an error
+/// if file_ptr fails to close. If this behaviour is needed, pass false to
+/// close_on_release and handle closing the file independently from stream.
+ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* 
stream,
+                                            void* file_ptr, int 
close_on_release);
 /// @}
 
 #ifdef __cplusplus
diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp
index e5e0ee33..8000ca05 100644
--- a/src/nanoarrow/nanoarrow_ipc.hpp
+++ b/src/nanoarrow/nanoarrow_ipc.hpp
@@ -76,6 +76,25 @@ inline void release_pointer(struct ArrowIpcInputStream* 
data) {
   }
 }
 
+template <>
+inline void init_pointer(struct ArrowIpcOutputStream* data) {
+  data->release = nullptr;
+}
+
+template <>
+inline void move_pointer(struct ArrowIpcOutputStream* src,
+                         struct ArrowIpcOutputStream* dst) {
+  memcpy(dst, src, sizeof(struct ArrowIpcOutputStream));
+  src->release = nullptr;
+}
+
+template <>
+inline void release_pointer(struct ArrowIpcOutputStream* data) {
+  if (data->release != nullptr) {
+    data->release(data);
+  }
+}
+
 }  // namespace internal
 }  // namespace nanoarrow
 
@@ -99,6 +118,9 @@ using UniqueEncoder = internal::Unique<struct 
ArrowIpcEncoder>;
 /// \brief Class wrapping a unique struct ArrowIpcInputStream
 using UniqueInputStream = internal::Unique<struct ArrowIpcInputStream>;
 
+/// \brief Class wrapping a unique struct ArrowIpcOutputStream
+using UniqueOutputStream = internal::Unique<struct ArrowIpcOutputStream>;
+
 /// @}
 
 }  // namespace ipc

Reply via email to