pitrou commented on code in PR #39455:
URL: https://github.com/apache/arrow/pull/39455#discussion_r1469772299


##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -1979,13 +1979,49 @@ Result<std::shared_ptr<RecordBatch>> 
ImportDeviceRecordBatch(
 
 namespace {
 
+Status ExportStreamSchema(const std::shared_ptr<RecordBatchReader>& src,
+                          struct ArrowSchema* out_schema) {
+  return ExportSchema(*src->schema(), out_schema);
+}
+
+Status ExportStreamSchema(const std::shared_ptr<ChunkedArray>& src,
+                          struct ArrowSchema* out_schema) {
+  return ExportType(*src->type(), out_schema);
+}
+
+Status ExportStreamNext(const std::shared_ptr<RecordBatchReader>& src, int i,

Review Comment:
   There is no guarantee that the number of batches fits in 31 bits, is there? 
(though of course the converse is highly unlikely...)
   
   Let's just make these `int64_t` for peace of mind :-)



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -1979,13 +1979,49 @@ Result<std::shared_ptr<RecordBatch>> 
ImportDeviceRecordBatch(
 
 namespace {
 
+Status ExportStreamSchema(const std::shared_ptr<RecordBatchReader>& src,
+                          struct ArrowSchema* out_schema) {
+  return ExportSchema(*src->schema(), out_schema);
+}
+
+Status ExportStreamSchema(const std::shared_ptr<ChunkedArray>& src,
+                          struct ArrowSchema* out_schema) {
+  return ExportType(*src->type(), out_schema);
+}
+
+Status ExportStreamNext(const std::shared_ptr<RecordBatchReader>& src, int i,
+                        struct ArrowArray* out_array) {
+  std::shared_ptr<RecordBatch> batch;
+  RETURN_NOT_OK(src->ReadNext(&batch));
+  if (batch == nullptr) {
+    // End of stream
+    ArrowArrayMarkReleased(out_array);
+    return Status::OK();
+  } else {
+    return ExportRecordBatch(*batch, out_array);
+  }
+}
+
+Status ExportStreamNext(const std::shared_ptr<ChunkedArray>& src, int i,
+                        struct ArrowArray* out_array) {
+  if (i >= src->num_chunks()) {
+    // End of stream
+    ArrowArrayMarkReleased(out_array);
+    return Status::OK();
+  } else {
+    return ExportArray(*src->chunk(i), out_array);
+  }
+}
+
+template <typename T>
 class ExportedArrayStream {
  public:
   struct PrivateData {
-    explicit PrivateData(std::shared_ptr<RecordBatchReader> reader)
-        : reader_(std::move(reader)) {}
+    explicit PrivateData(std::shared_ptr<T> reader)
+        : reader_(std::move(reader)), batch_num_(0) {}
 
-    std::shared_ptr<RecordBatchReader> reader_;
+    std::shared_ptr<T> reader_;
+    int batch_num_;

Review Comment:
   `int64_t` as well?



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -2180,15 +2211,119 @@ class ArrayStreamBatchReader : public 
RecordBatchReader {
     return {code, last_error ? std::string(last_error) : ""};
   }
 
+ private:
   mutable struct ArrowArrayStream stream_;
+};
+
+class ArrayStreamBatchReader : public RecordBatchReader, public 
ArrayStreamReader {
+ public:
+  explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream)
+      : ArrayStreamReader(stream) {}
+
+  Status Init() {
+    ARROW_ASSIGN_OR_RAISE(schema_, ReadSchema());
+    return Status::OK();
+  }
+
+  std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_RETURN_NOT_OK(CheckNotReleased());
+
+    struct ArrowArray c_array;
+    ARROW_RETURN_NOT_OK(ReadNextArrayInternal(&c_array));
+
+    if (ArrowArrayIsReleased(&c_array)) {
+      // End of stream
+      batch->reset();
+      return Status::OK();
+    } else {
+      return ImportRecordBatch(&c_array, schema_).Value(batch);
+    }
+  }
+
+  Status Close() override {
+    ReleaseStream();
+    return Status::OK();
+  }
+
+ private:
   std::shared_ptr<Schema> schema_;
 };
 
+class ArrayStreamArrayReader : public ArrayStreamReader {
+ public:
+  explicit ArrayStreamArrayReader(struct ArrowArrayStream* stream)
+      : ArrayStreamReader(stream) {}
+
+  Status Init() {
+    ARROW_ASSIGN_OR_RAISE(field_, ReadField());
+    return Status::OK();
+  }
+
+  std::shared_ptr<DataType> data_type() const { return field_->type(); }
+
+  Status ReadNext(std::shared_ptr<Array>* array) {
+    ARROW_RETURN_NOT_OK(CheckNotReleased());
+
+    struct ArrowArray c_array;
+    ARROW_RETURN_NOT_OK(ReadNextArrayInternal(&c_array));
+
+    if (ArrowArrayIsReleased(&c_array)) {
+      // End of stream
+      array->reset();
+      return Status::OK();
+    } else {
+      return ImportArray(&c_array, field_->type()).Value(array);
+    }
+  }
+
+  Status Close() {
+    ReleaseStream();
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Field> field_;
+};
+
 }  // namespace
 
 Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
     struct ArrowArrayStream* stream) {
-  return ArrayStreamBatchReader::Make(stream);
+  if (ArrowArrayStreamIsReleased(stream)) {
+    return Status::Invalid("Cannot import released ArrowArrayStream");
+  }
+
+  auto reader = std::make_shared<ArrayStreamBatchReader>(stream);
+  ARROW_RETURN_NOT_OK(reader->Init());
+  return reader;
+}
+
+Result<std::shared_ptr<ChunkedArray>> ImportChunkedArray(
+    struct ArrowArrayStream* stream) {
+  if (ArrowArrayStreamIsReleased(stream)) {
+    return Status::Invalid("Cannot import released ArrowArrayStream");
+  }
+
+  auto reader = std::make_shared<ArrayStreamArrayReader>(stream);
+  ARROW_RETURN_NOT_OK(reader->Init());
+
+  std::shared_ptr<DataType> data_type = reader->data_type();
+
+  ArrayVector chunks;
+  std::shared_ptr<Array> chunk;
+  while (true) {
+    ARROW_RETURN_NOT_OK(reader->ReadNext(&chunk));
+    if (!chunk) {
+      break;
+    }
+
+    chunks.push_back(std::move(chunk));
+  }
+
+  ARROW_RETURN_NOT_OK(reader->Close());
+  return ChunkedArray::Make(chunks, data_type);

Review Comment:
   Nit: avoid copying these variables
   ```suggestion
     return ChunkedArray::Make(std::move(chunks), std::move(data_type));
   ```



##########
cpp/src/arrow/c/bridge_test.cc:
##########
@@ -4467,6 +4478,43 @@ TEST_F(TestArrayStreamExport, Errors) {
   ASSERT_EQ(EINVAL, c_stream.get_next(&c_stream, &c_array));
 }
 
+TEST_F(TestArrayStreamExport, ChunkedArrayExport) {
+  ASSERT_OK_AND_ASSIGN(auto chunked_array,
+                       ChunkedArray::Make({ArrayFromJSON(int32(), "[1, 2]"),
+                                           ArrayFromJSON(int32(), "[4, 5, 
null]")}));

Review Comment:
   Can you also test a chunked array with zero chunks at some point? (here and 
for roundtripping)



##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -2180,15 +2211,119 @@ class ArrayStreamBatchReader : public 
RecordBatchReader {
     return {code, last_error ? std::string(last_error) : ""};
   }
 
+ private:
   mutable struct ArrowArrayStream stream_;
+};
+
+class ArrayStreamBatchReader : public RecordBatchReader, public 
ArrayStreamReader {
+ public:
+  explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream)
+      : ArrayStreamReader(stream) {}
+
+  Status Init() {
+    ARROW_ASSIGN_OR_RAISE(schema_, ReadSchema());
+    return Status::OK();
+  }
+
+  std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_RETURN_NOT_OK(CheckNotReleased());
+
+    struct ArrowArray c_array;
+    ARROW_RETURN_NOT_OK(ReadNextArrayInternal(&c_array));
+
+    if (ArrowArrayIsReleased(&c_array)) {
+      // End of stream
+      batch->reset();
+      return Status::OK();
+    } else {
+      return ImportRecordBatch(&c_array, schema_).Value(batch);
+    }
+  }
+
+  Status Close() override {
+    ReleaseStream();
+    return Status::OK();
+  }
+
+ private:
   std::shared_ptr<Schema> schema_;
 };
 
+class ArrayStreamArrayReader : public ArrayStreamReader {
+ public:
+  explicit ArrayStreamArrayReader(struct ArrowArrayStream* stream)
+      : ArrayStreamReader(stream) {}
+
+  Status Init() {
+    ARROW_ASSIGN_OR_RAISE(field_, ReadField());
+    return Status::OK();
+  }
+
+  std::shared_ptr<DataType> data_type() const { return field_->type(); }
+
+  Status ReadNext(std::shared_ptr<Array>* array) {
+    ARROW_RETURN_NOT_OK(CheckNotReleased());
+
+    struct ArrowArray c_array;
+    ARROW_RETURN_NOT_OK(ReadNextArrayInternal(&c_array));
+
+    if (ArrowArrayIsReleased(&c_array)) {
+      // End of stream
+      array->reset();
+      return Status::OK();
+    } else {
+      return ImportArray(&c_array, field_->type()).Value(array);
+    }
+  }
+
+  Status Close() {

Review Comment:
   Not sure this is useful since the `ArrayStreamReader` destructor already 
calls `ReleaseStream`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to