This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 1d5501a8 feat(c/driver/postgresql): String/Large String COPY Writers
(#1172)
1d5501a8 is described below
commit 1d5501a83211ceccb9f2f19c13aa17dbed5a1e47
Author: William Ayd <[email protected]>
AuthorDate: Fri Oct 6 11:06:28 2023 -0400
feat(c/driver/postgresql): String/Large String COPY Writers (#1172)
---
c/driver/postgresql/postgres_copy_reader.h | 24 ++++++++++++
c/driver/postgresql/postgres_copy_reader_test.cc | 49 ++++++++++++++++++++++++
2 files changed, 73 insertions(+)
diff --git a/c/driver/postgresql/postgres_copy_reader.h
b/c/driver/postgresql/postgres_copy_reader.h
index fce5e104..d35789eb 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -1172,6 +1172,26 @@ class PostgresCopyNetworkEndianFieldWriter : public
PostgresCopyFieldWriter {
}
};
+class PostgresCopyBinaryFieldWriter : public PostgresCopyFieldWriter {
+ public:
+ ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
+ const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+ if (is_null) {
+ constexpr int32_t field_size_bytes = -1;
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes,
error));
+ return ADBC_STATUS_OK;
+ }
+
+ struct ArrowStringView string_view =
+ ArrowArrayViewGetStringUnsafe(array_view_, index);
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer,
string_view.size_bytes, error));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowBufferAppend(buffer, string_view.data, string_view.size_bytes));
+
+ return ADBC_STATUS_OK;
+ }
+};
+
static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType
arrow_type,
PostgresCopyFieldWriter** out,
ArrowError* error) {
@@ -1188,6 +1208,10 @@ static inline ArrowErrorCode MakeCopyFieldWriter(const
enum ArrowType arrow_type
case NANOARROW_TYPE_INT64:
*out = new PostgresCopyNetworkEndianFieldWriter<int64_t>();
return NANOARROW_OK;
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_LARGE_STRING:
+ *out = new PostgresCopyBinaryFieldWriter();
+ return NANOARROW_OK;
default:
return EINVAL;
}
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc
b/c/driver/postgresql/postgres_copy_reader_test.cc
index 9edb3977..e18a7efb 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -589,6 +589,55 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadText) {
ASSERT_EQ(std::string(data_buffer + 3, 4), "1234");
}
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteString) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col",
NANOARROW_TYPE_STRING}}),
+ ADBC_STATUS_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<std::string>(
+ &schema.value, &array.value, &na_error, {"abc", "1234",
std::nullopt}),
+ ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+ ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+ const struct ArrowBuffer buf = tester.WriteBuffer();
+ // The last 2 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ constexpr size_t buf_size = sizeof(kTestPgCopyText) - 2;
+ ASSERT_EQ(buf.size_bytes, buf_size);
+ for (size_t i = 0; i < buf_size; i++) {
+ ASSERT_EQ(buf.data[i], kTestPgCopyText[i]);
+ }
+}
+
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteLargeString) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_EQ(
+ adbc_validation::MakeSchema(&schema.value, {{"col",
NANOARROW_TYPE_LARGE_STRING}}),
+ ADBC_STATUS_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<std::string>(
+ &schema.value, &array.value, &na_error, {"abc", "1234",
std::nullopt}),
+ ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+ ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+ const struct ArrowBuffer buf = tester.WriteBuffer();
+ // The last 2 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ constexpr size_t buf_size = sizeof(kTestPgCopyText) - 2;
+ ASSERT_EQ(buf.size_bytes, buf_size);
+ for (size_t i = 0; i < buf_size; i++) {
+ ASSERT_EQ(buf.data[i], kTestPgCopyText[i]);
+ }
+}
+
// COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM ( VALUES ('{-123,
-1}'), ('{0,
// 1, 123}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static uint8_t kTestPgCopyIntegerArray[] = {