This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new f7cf76cf feat(c/driver/postgresql): Binary COPY Writer (#1181)
f7cf76cf is described below
commit f7cf76cf6aa8c928f493662f8c41815e84d625a5
Author: William Ayd <[email protected]>
AuthorDate: Thu Oct 12 10:40:44 2023 -0400
feat(c/driver/postgresql): Binary COPY Writer (#1181)
---
c/driver/postgresql/postgres_copy_reader.h | 9 +--
c/driver/postgresql/postgres_copy_reader_test.cc | 92 ++++++++++++++++++++++++
c/validation/adbc_validation.cc | 12 +++-
c/validation/adbc_validation_util.h | 14 ++++
4 files changed, 121 insertions(+), 6 deletions(-)
diff --git a/c/driver/postgresql/postgres_copy_reader.h
b/c/driver/postgresql/postgres_copy_reader.h
index 0c0f003b..d9a7cdfb 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -1273,11 +1273,11 @@ class PostgresCopyDurationFieldWriter : public
PostgresCopyFieldWriter {
class PostgresCopyBinaryFieldWriter : public PostgresCopyFieldWriter {
public:
ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
- struct ArrowStringView string_view =
- ArrowArrayViewGetStringUnsafe(array_view_, index);
- NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer,
string_view.size_bytes, error));
+ struct ArrowBufferView buffer_view =
+ ArrowArrayViewGetBytesUnsafe(array_view_, index);
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer,
buffer_view.size_bytes, error));
NANOARROW_RETURN_NOT_OK(
- ArrowBufferAppend(buffer, string_view.data, string_view.size_bytes));
+ ArrowBufferAppend(buffer, buffer_view.data.as_uint8,
buffer_view.size_bytes));
return ADBC_STATUS_OK;
}
@@ -1362,6 +1362,7 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
case NANOARROW_TYPE_DOUBLE:
*out = new PostgresCopyDoubleFieldWriter();
return NANOARROW_OK;
+ case NANOARROW_TYPE_BINARY:
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_LARGE_STRING:
*out = new PostgresCopyBinaryFieldWriter();
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc
b/c/driver/postgresql/postgres_copy_reader_test.cc
index 6404b21e..00ba1204 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -1052,6 +1052,98 @@ TEST(PostgresCopyUtilsTest,
PostgresCopyWriteLargeString) {
}
}
+// COPY (SELECT CAST("col" AS BYTEA) AS "col" FROM ( VALUES (''), ('\x0001'),
+// ('\x01020304'), ('\xFEFF'), (NULL)) AS drvd("col")) TO STDOUT
+// WITH (FORMAT binary);
+static uint8_t kTestPgCopyBinary[] = {
+ 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00,
0x00, 0x04,
+ 0x01, 0x02, 0x03, 0x04, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0xfe, 0xff,
+ 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadBinary) {
+ ArrowBufferView data;
+ data.data.as_uint8 = kTestPgCopyBinary;
+ data.size_bytes = sizeof(kTestPgCopyBinary);
+
+ auto col_type = PostgresType(PostgresTypeId::kBytea);
+ PostgresType input_type(PostgresTypeId::kRecord);
+ input_type.AppendChild("col", col_type);
+
+ PostgresCopyStreamTester tester;
+ ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+ ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+ ASSERT_EQ(data.data.as_uint8 - kTestPgCopyBinary, sizeof(kTestPgCopyBinary));
+ ASSERT_EQ(data.size_bytes, 0);
+
+ nanoarrow::UniqueArray array;
+ ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
+ ASSERT_EQ(array->length, 5);
+ ASSERT_EQ(array->n_children, 1);
+
+ auto validity = reinterpret_cast<const
uint8_t*>(array->children[0]->buffers[0]);
+ auto offsets = reinterpret_cast<const
int32_t*>(array->children[0]->buffers[1]);
+ auto data_buffer = reinterpret_cast<const
uint8_t*>(array->children[0]->buffers[2]);
+ ASSERT_NE(validity, nullptr);
+ ASSERT_NE(data_buffer, nullptr);
+
+ ASSERT_TRUE(ArrowBitGet(validity, 0));
+ ASSERT_TRUE(ArrowBitGet(validity, 1));
+ ASSERT_TRUE(ArrowBitGet(validity, 2));
+ ASSERT_TRUE(ArrowBitGet(validity, 3));
+ ASSERT_FALSE(ArrowBitGet(validity, 4));
+
+ ASSERT_EQ(offsets[0], 0);
+ ASSERT_EQ(offsets[1], 0);
+ ASSERT_EQ(offsets[2], 2);
+ ASSERT_EQ(offsets[3], 6);
+ ASSERT_EQ(offsets[4], 8);
+ ASSERT_EQ(offsets[5], 8);
+
+ ASSERT_EQ(data_buffer[0], 0x00);
+ ASSERT_EQ(data_buffer[1], 0x01);
+ ASSERT_EQ(data_buffer[2], 0x01);
+ ASSERT_EQ(data_buffer[3], 0x02);
+ ASSERT_EQ(data_buffer[4], 0x03);
+ ASSERT_EQ(data_buffer[5], 0x04);
+ ASSERT_EQ(data_buffer[6], 0xfe);
+ ASSERT_EQ(data_buffer[7], 0xff);
+}
+
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteBinary) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col",
NANOARROW_TYPE_BINARY}}),
+ ADBC_STATUS_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<std::vector<std::byte>>(
+ &schema.value, &array.value, &na_error,
+ {
+ std::vector<std::byte>{},
+ std::vector<std::byte>{std::byte{0x00}, std::byte{0x01}},
+ std::vector<std::byte>{
+ std::byte{0x01}, std::byte{0x02}, std::byte{0x03},
std::byte{0x04}
+ },
+ std::vector<std::byte>{std::byte{0xfe}, std::byte{0xff}},
+ std::nullopt}),
+ ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+ ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+ const struct ArrowBuffer buf = tester.WriteBuffer();
+ // The last 2 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ constexpr size_t buf_size = sizeof(kTestPgCopyBinary) - 2;
+ ASSERT_EQ(buf.size_bytes, buf_size);
+ for (size_t i = 0; i < buf_size; i++) {
+ ASSERT_EQ(buf.data[i], kTestPgCopyBinary[i]) << "failure at index " << i;
+ }
+}
+
+
// COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM ( VALUES ('{-123,
-1}'), ('{0,
// 1, 123}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static uint8_t kTestPgCopyIntegerArray[] = {
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index a8fb0af8..2afa0caf 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -1506,8 +1506,16 @@ void StatementTest::TestSqlIngestLargeString() {
}
void StatementTest::TestSqlIngestBinary() {
- ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>(
- NANOARROW_TYPE_BINARY, {std::nullopt, "", "\x00\x01\x02\x04",
"\xFE\xFF"}));
+ ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<std::byte>>(
+ NANOARROW_TYPE_BINARY,
+ {
+ std::nullopt, std::vector<std::byte>{},
+ std::vector<std::byte>{std::byte{0x00}, std::byte{0x01}},
+ std::vector<std::byte>{
+ std::byte{0x01}, std::byte{0x02}, std::byte{0x03}, std::byte{0x04}
+ },
+ std::vector<std::byte>{std::byte{0xfe}, std::byte{0xff}}
+ }));
}
void StatementTest::TestSqlIngestDate32() {
diff --git a/c/validation/adbc_validation_util.h
b/c/validation/adbc_validation_util.h
index 0db6e491..da71e0d9 100644
--- a/c/validation/adbc_validation_util.h
+++ b/c/validation/adbc_validation_util.h
@@ -271,6 +271,14 @@ int MakeArray(struct ArrowArray* parent, struct
ArrowArray* array,
if (int errno_res = ArrowArrayAppendBytes(array, view); errno_res !=
0) {
return errno_res;
}
+ } else if constexpr (std::is_same<T, std::vector<std::byte>>::value) {
+ static_assert(std::is_same_v<uint8_t, unsigned char>);
+ struct ArrowBufferView view;
+ view.data.as_uint8 = reinterpret_cast<const uint8_t*>(v->data());
+ view.size_bytes = v->size();
+ if (int errno_res = ArrowArrayAppendBytes(array, view); errno_res !=
0) {
+ return errno_res;
+ }
} else if constexpr (std::is_same<T, ArrowInterval*>::value) {
if (int errno_res = ArrowArrayAppendInterval(array, *v); errno_res !=
0) {
return errno_res;
@@ -389,6 +397,12 @@ void CompareArray(struct ArrowArrayView* array,
struct ArrowStringView view = ArrowArrayViewGetStringUnsafe(array, i);
std::string str(view.data, view.size_bytes);
ASSERT_EQ(*v, str);
+ } else if constexpr (std::is_same<T, std::vector<std::byte>>::value) {
+ struct ArrowBufferView view = ArrowArrayViewGetBytesUnsafe(array, i);
+ ASSERT_EQ(v->size(), view.size_bytes);
+ for (int64_t i = 0; i < view.size_bytes; i++) {
+ ASSERT_EQ((*v)[i], std::byte{view.data.as_uint8[i]});
+ }
} else if constexpr (std::is_same<T, ArrowInterval*>::value) {
ASSERT_NE(array->buffer_views[1].data.data, nullptr);
struct ArrowInterval interval;