This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d5501a8 feat(c/driver/postgresql): String/Large String COPY Writers 
(#1172)
1d5501a8 is described below

commit 1d5501a83211ceccb9f2f19c13aa17dbed5a1e47
Author: William Ayd <[email protected]>
AuthorDate: Fri Oct 6 11:06:28 2023 -0400

    feat(c/driver/postgresql): String/Large String COPY Writers (#1172)
---
 c/driver/postgresql/postgres_copy_reader.h       | 24 ++++++++++++
 c/driver/postgresql/postgres_copy_reader_test.cc | 49 ++++++++++++++++++++++++
 2 files changed, 73 insertions(+)

diff --git a/c/driver/postgresql/postgres_copy_reader.h 
b/c/driver/postgresql/postgres_copy_reader.h
index fce5e104..d35789eb 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -1172,6 +1172,26 @@ class PostgresCopyNetworkEndianFieldWriter : public 
PostgresCopyFieldWriter {
   }
 };
 
+class PostgresCopyBinaryFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    if (is_null) {
+      constexpr int32_t field_size_bytes = -1;
+      NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+      return ADBC_STATUS_OK;
+    }
+
+    struct ArrowStringView string_view =
+        ArrowArrayViewGetStringUnsafe(array_view_, index);
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, 
string_view.size_bytes, error));
+    NANOARROW_RETURN_NOT_OK(
+        ArrowBufferAppend(buffer, string_view.data, string_view.size_bytes));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
 static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType 
arrow_type,
                                                  PostgresCopyFieldWriter** out,
                                                  ArrowError* error) {
@@ -1188,6 +1208,10 @@ static inline ArrowErrorCode MakeCopyFieldWriter(const 
enum ArrowType arrow_type
     case NANOARROW_TYPE_INT64:
       *out = new PostgresCopyNetworkEndianFieldWriter<int64_t>();
       return NANOARROW_OK;
+    case NANOARROW_TYPE_STRING:
+    case NANOARROW_TYPE_LARGE_STRING:
+      *out = new PostgresCopyBinaryFieldWriter();
+      return NANOARROW_OK;
     default:
       return EINVAL;
   }
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc 
b/c/driver/postgresql/postgres_copy_reader_test.cc
index 9edb3977..e18a7efb 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -589,6 +589,55 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadText) {
   ASSERT_EQ(std::string(data_buffer + 3, 4), "1234");
 }
 
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteString) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", 
NANOARROW_TYPE_STRING}}),
+            ADBC_STATUS_OK);
+  ASSERT_EQ(adbc_validation::MakeBatch<std::string>(
+                &schema.value, &array.value, &na_error, {"abc", "1234", 
std::nullopt}),
+            ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+  ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+  const struct ArrowBuffer buf = tester.WriteBuffer();
+  // The last 2 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  constexpr size_t buf_size = sizeof(kTestPgCopyText) - 2;
+  ASSERT_EQ(buf.size_bytes, buf_size);
+  for (size_t i = 0; i < buf_size; i++) {
+    ASSERT_EQ(buf.data[i], kTestPgCopyText[i]);
+  }
+}
+
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteLargeString) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_EQ(
+      adbc_validation::MakeSchema(&schema.value, {{"col", 
NANOARROW_TYPE_LARGE_STRING}}),
+      ADBC_STATUS_OK);
+  ASSERT_EQ(adbc_validation::MakeBatch<std::string>(
+                &schema.value, &array.value, &na_error, {"abc", "1234", 
std::nullopt}),
+            ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+  ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+  const struct ArrowBuffer buf = tester.WriteBuffer();
+  // The last 2 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  constexpr size_t buf_size = sizeof(kTestPgCopyText) - 2;
+  ASSERT_EQ(buf.size_bytes, buf_size);
+  for (size_t i = 0; i < buf_size; i++) {
+    ASSERT_EQ(buf.data[i], kTestPgCopyText[i]);
+  }
+}
+
 // COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM (  VALUES ('{-123, 
-1}'), ('{0,
 // 1, 123}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
 static uint8_t kTestPgCopyIntegerArray[] = {

Reply via email to