WillAyd commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1337776251


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* 
error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {

Review Comment:
   The `Write` method calls accept an `index` argument, which is a little 
different from the Reader setup. Instead of accessing by index, the Readers 
always call `ArrowBufferAppend` on the array they are building. 
   
   I think we could still do that here, it's just a little bit more complicated 
by the fact that there is no generator `ArrowBufferGetNext` or similar, so I 
figured just using index access was easier to start. Could be something larger 
I am overlooking



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* 
error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType 
arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, 
nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }

Review Comment:
   Not sure if this is necessary, just copied from the reader design



##########
c/driver/postgresql/postgres_copy_reader_test.cc:
##########
@@ -96,6 +127,32 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadBoolean) {
   ASSERT_FALSE(ArrowBitGet(data_buffer, 2));
 }
 
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteBoolean) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", 
NANOARROW_TYPE_BOOL}}),
+            ADBC_STATUS_OK);
+  ASSERT_EQ(adbc_validation::MakeBatch<bool>(&schema.value, &array.value, 
&na_error,
+                                             {true, false, std::nullopt}),
+            ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+
+  struct ArrowBuffer buffer;
+  ArrowBufferInit(&buffer);
+  ArrowBufferReserve(&buffer, sizeof(kTestPgCopyBoolean));
+  ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);
+
+  // The last 4 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  for (size_t i = 0; i < sizeof(kTestPgCopyBoolean) - 4; i++) {

Review Comment:
   Ultimately when we implement this in statement.cc I imagine we will build 
the buffer (maybe even in chunks) and send that via `PQputCopyData` . When all 
is said and done we would then do a `PQputCopyEnd` to send the last 4 bytes. 
Maybe we should make the end message a constant in the tests so it is clear 
what is part of the "data" versus the sentinel signaling the end of the buffer



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, 
ArrowError* error) {
   return NANOARROW_OK;
 }
 
+// Write a value to a buffer without checking the buffer size. Advances

Review Comment:
   I put all of this code into `postgres_copy_reader.h` because it re-uses a 
lot of the same patterns and constants. Maybe we should rename this 
`postgres_copy_io.h`?



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* 
error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType 
arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, 
nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, 
sizeof(kPgCopyBinarySignature));
+
+    const uint32_t flag_fields = 0;
+    ArrowBufferAppend(buffer, &flag_fields, sizeof(flag_fields));
+
+    const uint32_t extension_bytes = 0;
+    ArrowBufferAppend(buffer, &extension_bytes, sizeof(extension_bytes));
+
+    const int64_t header_bytes =
+        sizeof(kPgCopyBinarySignature) + sizeof(flag_fields) + 
sizeof(extension_bytes);
+    buffer->data += header_bytes;
+    array_size_approx_bytes_ += header_bytes;
+
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode WriteRecord(ArrowBuffer* buffer, ArrowError* error) {
+    const uint8_t* start = buffer->data;
+    NANOARROW_RETURN_NOT_OK(root_writer_.Write(buffer, records_written_, 
error));
+    records_written_++;
+    array_size_approx_bytes_ += buffer->data - start;
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode InitFieldWriters(ArrowError* error) {
+    if (schema_->release == nullptr) {
+      return EINVAL;
+    }
+
+    for (int64_t i = 0; i < schema_->n_children; i++) {
+      struct ArrowSchemaView schema_view;
+      if (ArrowSchemaViewInit(&schema_view, schema_->children[i], error) !=
+          NANOARROW_OK) {
+        return ADBC_STATUS_INTERNAL;
+      }
+      const ArrowType arrow_type = schema_view.type;
+      PostgresCopyFieldWriter* child_writer;
+      NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, 
error));
+      
root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  PostgresCopyFieldTupleWriter root_writer_;
+  struct ArrowSchema* schema_;
+  std::unique_ptr<struct ArrowArrayView> array_view_{new struct 
ArrowArrayView};

Review Comment:
   I'm a bit iffy on C++ constructs, but I think this is the easiest way to 
declare an pointer that owns data as a class member with C++11 compat. 
Apologies if I'm missing something easier



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* 
error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType 
arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, 
nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, 
sizeof(kPgCopyBinarySignature));

Review Comment:
   I think we can move towards `ArrowBufferAppendUnsafe` if ensure a proper 
buffer size up front. I think in the current protocol you need 19 bytes for the 
header, 2 bytes for the number of columns in each row, 4 bytes for each record 
to indicate the record length, n bits for every non-null record to contain its 
actual bytes and finally 4 bytes for the end message. 
   
   Something to investigate futher



##########
c/driver/postgresql/postgres_copy_reader_test.cc:
##########
@@ -52,6 +55,34 @@ class PostgresCopyStreamTester {
   PostgresCopyStreamReader reader_;
 };
 
+class PostgresCopyStreamWriteTester {
+ public:
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array,
+                      ArrowError* error = nullptr) {
+    NANOARROW_RETURN_NOT_OK(writer_.Init(schema, array));
+    NANOARROW_RETURN_NOT_OK(writer_.InitFieldWriters(error));
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode WriteAll(struct ArrowBuffer* buffer, ArrowError* error = 
nullptr) {
+    NANOARROW_RETURN_NOT_OK(writer_.WriteHeader(buffer, error));
+
+    int result;
+    do {
+      result = writer_.WriteRecord(buffer, error);
+    } while (result == NANOARROW_OK);
+
+    // TODO: don't think we should do this here; the reader equivalent does

Review Comment:
   AFAICT the reader implementation just moves through the message buffer. I 
mirrored that as well for the writer, but that means that trying to read the 
buffer after the fact requires knowing how many bytes were traversed and moving 
back there. Probably a better way to do this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to