This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new aa07ece7 feat(c/driver/postgresql): add integral COPY writers (#1130)
aa07ece7 is described below
commit aa07ece74bf73de0cfd364d053b7aba0a814d402
Author: William Ayd <[email protected]>
AuthorDate: Fri Sep 29 10:46:11 2023 -0400
feat(c/driver/postgresql): add integral COPY writers (#1130)
---
c/driver/postgresql/postgres_copy_reader.h | 28 ++++++++
c/driver/postgresql/postgres_copy_reader_test.cc | 90 ++++++++++++++++++++++++
2 files changed, 118 insertions(+)
diff --git a/c/driver/postgresql/postgres_copy_reader.h
b/c/driver/postgresql/postgres_copy_reader.h
index 0aab4690..98331087 100644
--- a/c/driver/postgresql/postgres_copy_reader.h
+++ b/c/driver/postgresql/postgres_copy_reader.h
@@ -1166,6 +1166,25 @@ class PostgresCopyBooleanFieldWriter : public
PostgresCopyFieldWriter {
}
};
+template <typename T, T kOffset = 0>
+class PostgresCopyNetworkEndianFieldWriter : public PostgresCopyFieldWriter {
+ public:
+ ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
+ const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+ const int32_t field_size_bytes = is_null ? -1 : sizeof(T);
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes,
error));
+ if (is_null) {
+ return ADBC_STATUS_OK;
+ }
+
+ const T value =
+ static_cast<T>(ArrowArrayViewGetIntUnsafe(array_view_, index)) -
kOffset;
+ NANOARROW_RETURN_NOT_OK(WriteChecked<T>(buffer, value, error));
+
+ return ADBC_STATUS_OK;
+ }
+};
+
static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType
arrow_type,
PostgresCopyFieldWriter** out,
ArrowError* error) {
@@ -1173,6 +1192,15 @@ static inline ArrowErrorCode MakeCopyFieldWriter(const
enum ArrowType arrow_type
case NANOARROW_TYPE_BOOL:
*out = new PostgresCopyBooleanFieldWriter();
return NANOARROW_OK;
+ case NANOARROW_TYPE_INT16:
+ *out = new PostgresCopyNetworkEndianFieldWriter<int16_t>();
+ return NANOARROW_OK;
+ case NANOARROW_TYPE_INT32:
+ *out = new PostgresCopyNetworkEndianFieldWriter<int32_t>();
+ return NANOARROW_OK;
+ case NANOARROW_TYPE_INT64:
+ *out = new PostgresCopyNetworkEndianFieldWriter<int64_t>();
+ return NANOARROW_OK;
default:
return EINVAL;
}
diff --git a/c/driver/postgresql/postgres_copy_reader_test.cc
b/c/driver/postgresql/postgres_copy_reader_test.cc
index f389d2a6..d520271a 100644
--- a/c/driver/postgresql/postgres_copy_reader_test.cc
+++ b/c/driver/postgresql/postgres_copy_reader_test.cc
@@ -200,6 +200,36 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadSmallInt) {
ASSERT_EQ(data_buffer[4], 0);
}
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteInt16) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col",
NANOARROW_TYPE_INT16}}),
+ ADBC_STATUS_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<int16_t>(&schema.value, &array.value,
&na_error,
+ {-123, -1, 1, 123,
std::nullopt}),
+ ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+
+ struct ArrowBuffer buffer;
+ ArrowBufferInit(&buffer);
+ ArrowBufferReserve(&buffer, sizeof(kTestPgCopySmallInt));
+ uint8_t* cursor = buffer.data;
+
+ ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);
+
+ // The last 4 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ for (size_t i = 0; i < sizeof(kTestPgCopySmallInt) - 4; i++) {
+ EXPECT_EQ(cursor[i], kTestPgCopySmallInt[i]);
+ }
+
+ buffer.data = cursor;
+ ArrowBufferReset(&buffer);
+}
+
// COPY (SELECT CAST("col" AS INTEGER) AS "col" FROM ( VALUES (-123), (-1),
(1), (123),
// (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static uint8_t kTestPgCopyInteger[] = {
@@ -247,6 +277,36 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadInteger) {
ASSERT_EQ(data_buffer[4], 0);
}
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteInt32) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col",
NANOARROW_TYPE_INT32}}),
+ ADBC_STATUS_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<int32_t>(&schema.value, &array.value,
&na_error,
+ {-123, -1, 1, 123,
std::nullopt}),
+ ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+
+ struct ArrowBuffer buffer;
+ ArrowBufferInit(&buffer);
+ ArrowBufferReserve(&buffer, sizeof(kTestPgCopyInteger));
+ uint8_t* cursor = buffer.data;
+
+ ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);
+
+ // The last 4 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ for (size_t i = 0; i < sizeof(kTestPgCopyInteger) - 4; i++) {
+ EXPECT_EQ(cursor[i], kTestPgCopyInteger[i]);
+ }
+
+ buffer.data = cursor;
+ ArrowBufferReset(&buffer);
+}
+
// COPY (SELECT CAST("col" AS BIGINT) AS "col" FROM ( VALUES (-123), (-1),
(1), (123),
// (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static uint8_t kTestPgCopyBigInt[] = {
@@ -295,6 +355,36 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadBigInt) {
ASSERT_EQ(data_buffer[4], 0);
}
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteInt64) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col",
NANOARROW_TYPE_INT64}}),
+ ADBC_STATUS_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<int64_t>(&schema.value, &array.value,
&na_error,
+ {-123, -1, 1, 123,
std::nullopt}),
+ ADBC_STATUS_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+
+ struct ArrowBuffer buffer;
+ ArrowBufferInit(&buffer);
+ ArrowBufferReserve(&buffer, sizeof(kTestPgCopyBigInt));
+ uint8_t* cursor = buffer.data;
+
+ ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);
+
+ // The last 4 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ for (size_t i = 0; i < sizeof(kTestPgCopyBigInt) - 4; i++) {
+ EXPECT_EQ(cursor[i], kTestPgCopyBigInt[i]);
+ }
+
+ buffer.data = cursor;
+ ArrowBufferReset(&buffer);
+}
+
// COPY (SELECT CAST("col" AS REAL) AS "col" FROM ( VALUES (-123.456), (-1),
(1),
// (123.456), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static uint8_t kTestPgCopyReal[] = {