This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new f7cf76cf feat(c/driver/postgresql): Binary COPY Writer (#1181)
f7cf76cf is described below

commit f7cf76cf6aa8c928f493662f8c41815e84d625a5
Author: William Ayd <[email protected]>
AuthorDate: Thu Oct 12 10:40:44 2023 -0400

    feat(c/driver/postgresql): Binary COPY Writer (#1181)
---
 c/driver/postgresql/postgres_copy_reader.h       |  9 +--
 c/driver/postgresql/postgres_copy_reader_test.cc | 92 ++++++++++++++++++++++++
 c/validation/adbc_validation.cc                  | 12 +++-
 c/validation/adbc_validation_util.h              | 14 ++++
 4 files changed, 121 insertions(+), 6 deletions(-)

diff --git a/c/driver/postgresql/postgres_copy_reader.h 
b/c/driver/postgresql/postgres_copy_reader.h
index 0c0f003b..d9a7cdfb 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -1273,11 +1273,11 @@ class PostgresCopyDurationFieldWriter : public 
PostgresCopyFieldWriter {
 class PostgresCopyBinaryFieldWriter : public PostgresCopyFieldWriter {
  public:
   ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
-    struct ArrowStringView string_view =
-        ArrowArrayViewGetStringUnsafe(array_view_, index);
-    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, 
string_view.size_bytes, error));
+    struct ArrowBufferView buffer_view =
+        ArrowArrayViewGetBytesUnsafe(array_view_, index);
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, 
buffer_view.size_bytes, error));
     NANOARROW_RETURN_NOT_OK(
-        ArrowBufferAppend(buffer, string_view.data, string_view.size_bytes));
+        ArrowBufferAppend(buffer, buffer_view.data.as_uint8, 
buffer_view.size_bytes));
 
     return ADBC_STATUS_OK;
   }
@@ -1362,6 +1362,7 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
     case NANOARROW_TYPE_DOUBLE:
       *out = new PostgresCopyDoubleFieldWriter();
       return NANOARROW_OK;
+    case NANOARROW_TYPE_BINARY:
     case NANOARROW_TYPE_STRING:
     case NANOARROW_TYPE_LARGE_STRING:
       *out = new PostgresCopyBinaryFieldWriter();
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc 
b/c/driver/postgresql/postgres_copy_reader_test.cc
index 6404b21e..00ba1204 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -1052,6 +1052,98 @@ TEST(PostgresCopyUtilsTest, 
PostgresCopyWriteLargeString) {
   }
 }
 
+// COPY (SELECT CAST("col" AS BYTEA) AS "col" FROM (  VALUES (''), ('\x0001'),
+// ('\x01020304'), ('\xFEFF'), (NULL)) AS drvd("col")) TO STDOUT
+// WITH (FORMAT binary);
+static uint8_t kTestPgCopyBinary[] = {
+    0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 
0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
+    0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00, 
0x00, 0x04,
+    0x01, 0x02, 0x03, 0x04, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0xfe, 0xff,
+    0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadBinary) {
+  ArrowBufferView data;
+  data.data.as_uint8 = kTestPgCopyBinary;
+  data.size_bytes = sizeof(kTestPgCopyBinary);
+
+  auto col_type = PostgresType(PostgresTypeId::kBytea);
+  PostgresType input_type(PostgresTypeId::kRecord);
+  input_type.AppendChild("col", col_type);
+
+  PostgresCopyStreamTester tester;
+  ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+  ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+  ASSERT_EQ(data.data.as_uint8 - kTestPgCopyBinary, sizeof(kTestPgCopyBinary));
+  ASSERT_EQ(data.size_bytes, 0);
+
+  nanoarrow::UniqueArray array;
+  ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
+  ASSERT_EQ(array->length, 5);
+  ASSERT_EQ(array->n_children, 1);
+
+  auto validity = reinterpret_cast<const 
uint8_t*>(array->children[0]->buffers[0]);
+  auto offsets = reinterpret_cast<const 
int32_t*>(array->children[0]->buffers[1]);
+  auto data_buffer = reinterpret_cast<const 
uint8_t*>(array->children[0]->buffers[2]);
+  ASSERT_NE(validity, nullptr);
+  ASSERT_NE(data_buffer, nullptr);
+
+  ASSERT_TRUE(ArrowBitGet(validity, 0));
+  ASSERT_TRUE(ArrowBitGet(validity, 1));
+  ASSERT_TRUE(ArrowBitGet(validity, 2));
+  ASSERT_TRUE(ArrowBitGet(validity, 3));
+  ASSERT_FALSE(ArrowBitGet(validity, 4));
+
+  ASSERT_EQ(offsets[0], 0);
+  ASSERT_EQ(offsets[1], 0);
+  ASSERT_EQ(offsets[2], 2);
+  ASSERT_EQ(offsets[3], 6);
+  ASSERT_EQ(offsets[4], 8);
+  ASSERT_EQ(offsets[5], 8);
+
+  ASSERT_EQ(data_buffer[0], 0x00);
+  ASSERT_EQ(data_buffer[1], 0x01);
+  ASSERT_EQ(data_buffer[2], 0x01);
+  ASSERT_EQ(data_buffer[3], 0x02);
+  ASSERT_EQ(data_buffer[4], 0x03);
+  ASSERT_EQ(data_buffer[5], 0x04);
+  ASSERT_EQ(data_buffer[6], 0xfe);
+  ASSERT_EQ(data_buffer[7], 0xff);
+}
+
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteBinary) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", 
NANOARROW_TYPE_BINARY}}),
+            ADBC_STATUS_OK);
+  ASSERT_EQ(adbc_validation::MakeBatch<std::vector<std::byte>>(
+            &schema.value, &array.value, &na_error,
+            {
+              std::vector<std::byte>{},
+              std::vector<std::byte>{std::byte{0x00}, std::byte{0x01}},
+              std::vector<std::byte>{
+                std::byte{0x01}, std::byte{0x02}, std::byte{0x03}, 
std::byte{0x04}
+              },
+              std::vector<std::byte>{std::byte{0xfe}, std::byte{0xff}},
+              std::nullopt}),
+            ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+  ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+  const struct ArrowBuffer buf = tester.WriteBuffer();
+  // The last 2 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  constexpr size_t buf_size = sizeof(kTestPgCopyBinary) - 2;
+  ASSERT_EQ(buf.size_bytes, buf_size);
+  for (size_t i = 0; i < buf_size; i++) {
+    ASSERT_EQ(buf.data[i], kTestPgCopyBinary[i]) << "failure at index " << i;
+  }
+}
+
+
 // COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM (  VALUES ('{-123, 
-1}'), ('{0,
 // 1, 123}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
 static uint8_t kTestPgCopyIntegerArray[] = {
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index a8fb0af8..2afa0caf 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -1506,8 +1506,16 @@ void StatementTest::TestSqlIngestLargeString() {
 }
 
 void StatementTest::TestSqlIngestBinary() {
-  ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>(
-      NANOARROW_TYPE_BINARY, {std::nullopt, "", "\x00\x01\x02\x04", 
"\xFE\xFF"}));
+  ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<std::byte>>(
+      NANOARROW_TYPE_BINARY,
+      {
+        std::nullopt, std::vector<std::byte>{},
+        std::vector<std::byte>{std::byte{0x00}, std::byte{0x01}},
+        std::vector<std::byte>{
+          std::byte{0x01}, std::byte{0x02}, std::byte{0x03}, std::byte{0x04}
+        },
+        std::vector<std::byte>{std::byte{0xfe}, std::byte{0xff}}
+      }));
 }
 
 void StatementTest::TestSqlIngestDate32() {
diff --git a/c/validation/adbc_validation_util.h 
b/c/validation/adbc_validation_util.h
index 0db6e491..da71e0d9 100644
--- a/c/validation/adbc_validation_util.h
+++ b/c/validation/adbc_validation_util.h
@@ -271,6 +271,14 @@ int MakeArray(struct ArrowArray* parent, struct 
ArrowArray* array,
         if (int errno_res = ArrowArrayAppendBytes(array, view); errno_res != 
0) {
           return errno_res;
         }
+      } else if constexpr (std::is_same<T, std::vector<std::byte>>::value) {
+        static_assert(std::is_same_v<uint8_t, unsigned char>);
+        struct ArrowBufferView view;
+        view.data.as_uint8 = reinterpret_cast<const uint8_t*>(v->data());
+        view.size_bytes = v->size();
+        if (int errno_res = ArrowArrayAppendBytes(array, view); errno_res != 
0) {
+          return errno_res;
+        }
       } else if constexpr (std::is_same<T, ArrowInterval*>::value) {
         if (int errno_res = ArrowArrayAppendInterval(array, *v); errno_res != 
0) {
           return errno_res;
@@ -389,6 +397,12 @@ void CompareArray(struct ArrowArrayView* array,
         struct ArrowStringView view = ArrowArrayViewGetStringUnsafe(array, i);
         std::string str(view.data, view.size_bytes);
         ASSERT_EQ(*v, str);
+      } else if constexpr (std::is_same<T, std::vector<std::byte>>::value) {
+        struct ArrowBufferView view = ArrowArrayViewGetBytesUnsafe(array, i);
+        ASSERT_EQ(v->size(), view.size_bytes);
+        for (int64_t i = 0; i < view.size_bytes; i++) {
+          ASSERT_EQ((*v)[i], std::byte{view.data.as_uint8[i]});
+        }
       } else if constexpr (std::is_same<T, ArrowInterval*>::value) {
         ASSERT_NE(array->buffer_views[1].data.data, nullptr);
         struct ArrowInterval interval;

Reply via email to