WillAyd commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1339089572


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* 
error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) 
override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, 
error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType 
arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, 
nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, 
sizeof(kPgCopyBinarySignature));
+
+    const uint32_t flag_fields = 0;
+    ArrowBufferAppend(buffer, &flag_fields, sizeof(flag_fields));
+
+    const uint32_t extension_bytes = 0;
+    ArrowBufferAppend(buffer, &extension_bytes, sizeof(extension_bytes));
+
+    const int64_t header_bytes =
+        sizeof(kPgCopyBinarySignature) + sizeof(flag_fields) + 
sizeof(extension_bytes);
+    buffer->data += header_bytes;
+    array_size_approx_bytes_ += header_bytes;
+
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode WriteRecord(ArrowBuffer* buffer, ArrowError* error) {
+    const uint8_t* start = buffer->data;
+    NANOARROW_RETURN_NOT_OK(root_writer_.Write(buffer, records_written_, 
error));
+    records_written_++;
+    array_size_approx_bytes_ += buffer->data - start;
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode InitFieldWriters(ArrowError* error) {
+    if (schema_->release == nullptr) {
+      return EINVAL;
+    }
+
+    for (int64_t i = 0; i < schema_->n_children; i++) {
+      struct ArrowSchemaView schema_view;
+      if (ArrowSchemaViewInit(&schema_view, schema_->children[i], error) !=
+          NANOARROW_OK) {
+        return ADBC_STATUS_INTERNAL;
+      }
+      const ArrowType arrow_type = schema_view.type;
+      PostgresCopyFieldWriter* child_writer;
+      NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, 
error));
+      
root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  PostgresCopyFieldTupleWriter root_writer_;
+  struct ArrowSchema* schema_;
+  std::unique_ptr<struct ArrowArrayView> array_view_{new struct 
ArrowArrayView};

Review Comment:
   Cool that is a great idea. I think we would have to refactor the Handle to 
move from statement.cc to postgres_util.h so will tackle in another PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to