This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new aa07ece7 feat(c/driver/postgresql): add integral COPY writers (#1130)
aa07ece7 is described below

commit aa07ece74bf73de0cfd364d053b7aba0a814d402
Author: William Ayd <[email protected]>
AuthorDate: Fri Sep 29 10:46:11 2023 -0400

    feat(c/driver/postgresql): add integral COPY writers (#1130)
---
 c/driver/postgresql/postgres_copy_reader.h       | 28 ++++++++
 c/driver/postgresql/postgres_copy_reader_test.cc | 90 ++++++++++++++++++++++++
 2 files changed, 118 insertions(+)

diff --git a/c/driver/postgresql/postgres_copy_reader.h 
b/c/driver/postgresql/postgres_copy_reader.h
index 0aab4690..98331087 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -1166,6 +1166,25 @@ class PostgresCopyBooleanFieldWriter : public 
PostgresCopyFieldWriter {
   }
 };
 
+template <typename T, T kOffset = 0>
+class PostgresCopyNetworkEndianFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : sizeof(T);
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const T value =
+        static_cast<T>(ArrowArrayViewGetIntUnsafe(array_view_, index)) - 
kOffset;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<T>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
 static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType 
arrow_type,
                                                  PostgresCopyFieldWriter** out,
                                                  ArrowError* error) {
@@ -1173,6 +1192,15 @@ static inline ArrowErrorCode MakeCopyFieldWriter(const 
enum ArrowType arrow_type
     case NANOARROW_TYPE_BOOL:
       *out = new PostgresCopyBooleanFieldWriter();
       return NANOARROW_OK;
+    case NANOARROW_TYPE_INT16:
+      *out = new PostgresCopyNetworkEndianFieldWriter<int16_t>();
+      return NANOARROW_OK;
+    case NANOARROW_TYPE_INT32:
+      *out = new PostgresCopyNetworkEndianFieldWriter<int32_t>();
+      return NANOARROW_OK;
+    case NANOARROW_TYPE_INT64:
+      *out = new PostgresCopyNetworkEndianFieldWriter<int64_t>();
+      return NANOARROW_OK;
     default:
       return EINVAL;
   }
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc 
b/c/driver/postgresql/postgres_copy_reader_test.cc
index f389d2a6..d520271a 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -200,6 +200,36 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadSmallInt) {
   ASSERT_EQ(data_buffer[4], 0);
 }
 
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteInt16) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", 
NANOARROW_TYPE_INT16}}),
+            ADBC_STATUS_OK);
+  ASSERT_EQ(adbc_validation::MakeBatch<int16_t>(&schema.value, &array.value, 
&na_error,
+                                                {-123, -1, 1, 123, 
std::nullopt}),
+            ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+
+  struct ArrowBuffer buffer;
+  ArrowBufferInit(&buffer);
+  ArrowBufferReserve(&buffer, sizeof(kTestPgCopySmallInt));
+  uint8_t* cursor = buffer.data;
+
+  ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);
+
+  // The last 4 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  for (size_t i = 0; i < sizeof(kTestPgCopySmallInt) - 4; i++) {
+    EXPECT_EQ(cursor[i], kTestPgCopySmallInt[i]);
+  }
+
+  buffer.data = cursor;
+  ArrowBufferReset(&buffer);
+}
+
 // COPY (SELECT CAST("col" AS INTEGER) AS "col" FROM (  VALUES (-123), (-1), 
(1), (123),
 // (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
 static uint8_t kTestPgCopyInteger[] = {
@@ -247,6 +277,36 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadInteger) {
   ASSERT_EQ(data_buffer[4], 0);
 }
 
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteInt32) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", 
NANOARROW_TYPE_INT32}}),
+            ADBC_STATUS_OK);
+  ASSERT_EQ(adbc_validation::MakeBatch<int32_t>(&schema.value, &array.value, 
&na_error,
+                                                {-123, -1, 1, 123, 
std::nullopt}),
+            ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+
+  struct ArrowBuffer buffer;
+  ArrowBufferInit(&buffer);
+  ArrowBufferReserve(&buffer, sizeof(kTestPgCopyInteger));
+  uint8_t* cursor = buffer.data;
+
+  ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);
+
+  // The last 4 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  for (size_t i = 0; i < sizeof(kTestPgCopyInteger) - 4; i++) {
+    EXPECT_EQ(cursor[i], kTestPgCopyInteger[i]);
+  }
+
+  buffer.data = cursor;
+  ArrowBufferReset(&buffer);
+}
+
 // COPY (SELECT CAST("col" AS BIGINT) AS "col" FROM (  VALUES (-123), (-1), 
(1), (123),
 // (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
 static uint8_t kTestPgCopyBigInt[] = {
@@ -295,6 +355,36 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadBigInt) {
   ASSERT_EQ(data_buffer[4], 0);
 }
 
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteInt64) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", 
NANOARROW_TYPE_INT64}}),
+            ADBC_STATUS_OK);
+  ASSERT_EQ(adbc_validation::MakeBatch<int64_t>(&schema.value, &array.value, 
&na_error,
+                                                {-123, -1, 1, 123, 
std::nullopt}),
+            ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+
+  struct ArrowBuffer buffer;
+  ArrowBufferInit(&buffer);
+  ArrowBufferReserve(&buffer, sizeof(kTestPgCopyBigInt));
+  uint8_t* cursor = buffer.data;
+
+  ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);
+
+  // The last 4 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  for (size_t i = 0; i < sizeof(kTestPgCopyBigInt) - 4; i++) {
+    EXPECT_EQ(cursor[i], kTestPgCopyBigInt[i]);
+  }
+
+  buffer.data = cursor;
+  ArrowBufferReset(&buffer);
+}
+
 // COPY (SELECT CAST("col" AS REAL) AS "col" FROM (  VALUES (-123.456), (-1), 
(1),
 // (123.456), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
 static uint8_t kTestPgCopyReal[] = {

Reply via email to