This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 22c5696b8 feat(c/driver/postgresql): FIXED_SIZED_LIST Writer support
(#1975)
22c5696b8 is described below
commit 22c5696b850ca5fb4d3d92bac3e6a4fda9e0df5f
Author: William Ayd <[email protected]>
AuthorDate: Fri Jul 5 21:24:06 2024 -0400
feat(c/driver/postgresql): FIXED_SIZED_LIST Writer support (#1975)
Another enhancement for https://github.com/apache/arrow-adbc/issues/1882
---
.../postgresql/copy/postgres_copy_writer_test.cc | 60 ++++++++++++++++++++++
c/driver/postgresql/copy/writer.h | 39 ++++++++++----
2 files changed, 88 insertions(+), 11 deletions(-)
diff --git a/c/driver/postgresql/copy/postgres_copy_writer_test.cc
b/c/driver/postgresql/copy/postgres_copy_writer_test.cc
index b0124600d..618f27cf1 100644
--- a/c/driver/postgresql/copy/postgres_copy_writer_test.cc
+++ b/c/driver/postgresql/copy/postgres_copy_writer_test.cc
@@ -1052,6 +1052,66 @@ TEST_P(PostgresCopyListTest,
PostgresCopyWriteListVarchar) {
INSTANTIATE_TEST_SUITE_P(ArrowListTypes, PostgresCopyListTest,
testing::Values(NANOARROW_TYPE_LIST,
NANOARROW_TYPE_LARGE_LIST));
+// COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM ( VALUES ('{1,
2}'),
+// ('{-1, -2}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY);
+static const uint8_t kTestPgCopyFixedSizeIntegerArray[] = {
+ 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00,
0x00, 0x00,
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00,
0x02, 0x00,
+ 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x01, 0x00,
0x00, 0x00,
+ 0x04, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00,
0x00, 0x00,
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00,
0x02, 0x00,
+ 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x00,
0x00, 0x00,
+ 0x04, 0xff, 0xff, 0xff, 0xfe, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff};
+TEST_F(PostgresCopyTest, PostgresCopyWriteFixedSizeListInteger) {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT),
NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK);
+
+ ArrowSchemaInit(schema->children[0]);
+ ASSERT_EQ(
+ ArrowSchemaSetTypeFixedSize(schema->children[0],
NANOARROW_TYPE_FIXED_SIZE_LIST, 2),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0],
NANOARROW_TYPE_INT32),
+ NANOARROW_OK);
+
+ ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr),
NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK);
+
+ ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 1),
NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 2),
NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
+
+ ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -1),
NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -2),
NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
+
+ ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
+
+ ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error),
NANOARROW_OK);
+
+ PostgresCopyStreamWriteTester tester;
+ ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_),
NANOARROW_OK);
+ ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);
+
+ const struct ArrowBuffer buf = tester.WriteBuffer();
+ // The last 2 bytes of a message can be transmitted via PQputCopyData
+ // so no need to test those bytes from the Writer
+ constexpr size_t buf_size = sizeof(kTestPgCopyFixedSizeIntegerArray) - 2;
+ ASSERT_EQ(buf.size_bytes, buf_size);
+ for (size_t i = 0; i < buf_size; i++) {
+ ASSERT_EQ(buf.data[i], kTestPgCopyFixedSizeIntegerArray[i])
+ << "failure at index " << i;
+ }
+}
+
TEST_F(PostgresCopyTest, PostgresCopyWriteMultiBatch) {
// Regression test for https://github.com/apache/arrow-adbc/issues/1310
adbc_validation::Handle<struct ArrowSchema> schema;
diff --git a/c/driver/postgresql/copy/writer.h
b/c/driver/postgresql/copy/writer.h
index 0730db110..528e1f375 100644
--- a/c/driver/postgresql/copy/writer.h
+++ b/c/driver/postgresql/copy/writer.h
@@ -436,6 +436,7 @@ class PostgresCopyBinaryDictFieldWriter : public
PostgresCopyFieldWriter {
}
};
+template <bool IsFixedSize>
class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter {
public:
explicit PostgresCopyListFieldWriter(uint32_t child_oid) :
child_oid_{child_oid} {}
@@ -452,13 +453,21 @@ class PostgresCopyListFieldWriter : public
PostgresCopyFieldWriter {
constexpr int32_t ndim = 1;
constexpr int32_t has_null_flags = 0;
- const int32_t start = ArrowArrayViewListChildOffset(array_view_, index);
- const int32_t end = ArrowArrayViewListChildOffset(array_view_, index + 1);
+ // TODO: the LARGE_LIST should use 64 bit indexes
+ int32_t start, end;
+ if constexpr (IsFixedSize) {
+ start = index * array_view_->layout.child_size_elements;
+ end = start + array_view_->layout.child_size_elements;
+ } else {
+ start = ArrowArrayViewListChildOffset(array_view_, index);
+ end = ArrowArrayViewListChildOffset(array_view_, index + 1);
+ }
+
const int32_t dim = end - start;
constexpr int32_t lb = 1;
- // for fixed size fields where we know the size of each record we would
write to
- // postgres T, we could avoid the use of a temporary buffer and just write
+ // for children of a fixed size T we could avoid the use of a temporary
buffer
+ /// and theoretically just write
//
// const int32_t field_size_bytes =
// sizeof(ndim) + sizeof(has_null_flags) + sizeof(child_oid_) +
sizeof(dim) * ndim
@@ -698,7 +707,8 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
break;
}
case NANOARROW_TYPE_LIST:
- case NANOARROW_TYPE_LARGE_LIST: {
+ case NANOARROW_TYPE_LARGE_LIST:
+ case NANOARROW_TYPE_FIXED_SIZE_LIST: {
// For now our implementation only supports primitive children types
// See PostgresCopyListFieldWriter::Write for limtiations
struct ArrowSchemaView child_schema_view;
@@ -708,17 +718,24 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
NANOARROW_RETURN_NOT_OK(PostgresType::FromSchema(type_resolver,
schema->children[0],
&child_type, error));
- auto list_writer =
std::make_unique<PostgresCopyListFieldWriter>(child_type.oid());
- list_writer->Init(array_view);
-
std::unique_ptr<PostgresCopyFieldWriter> child_writer;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(schema->children[0],
array_view->children[0],
type_resolver,
&child_writer, error));
- list_writer->InitChild(std::move(child_writer));
-
- *out = std::move(list_writer);
+ if (schema_view.type == NANOARROW_TYPE_FIXED_SIZE_LIST) {
+ auto list_writer =
+
std::make_unique<PostgresCopyListFieldWriter<true>>(child_type.oid());
+ list_writer->Init(array_view);
+ list_writer->InitChild(std::move(child_writer));
+ *out = std::move(list_writer);
+ } else {
+ auto list_writer =
+
std::make_unique<PostgresCopyListFieldWriter<false>>(child_type.oid());
+ list_writer->Init(array_view);
+ list_writer->InitChild(std::move(child_writer));
+ *out = std::move(list_writer);
+ }
return NANOARROW_OK;
}
default: