lidavidm commented on code in PR #44495:
URL: https://github.com/apache/arrow/pull/44495#discussion_r1809747368


##########
cpp/src/arrow/c/bridge.h:
##########
@@ -406,4 +407,75 @@ Result<std::shared_ptr<ChunkedArray>> 
ImportDeviceChunkedArray(
 
 /// @}
 
+/// \defgroup c-async-stream-interface Functions for working with the async C 
data
+/// interface.
+///
+/// @{
+
+class AsyncErrorDetail : public StatusDetail {
+ public:
+  AsyncErrorDetail(int code, std::string message, std::string metadata)
+      : code_(code), message_(std::move(message)), 
metadata_(std::move(metadata)) {}
+  const char* type_id() const override { return "AsyncErrorDetail"; }
+  std::string ToString() const override { return message_; }
+  int code() const { return code_; }
+  const std::string& ErrorMetadata() const { return metadata_; }
+
+ private:
+  int code_{0};
+  std::string message_;
+  std::string metadata_;
+};
+
+struct AsyncRecordBatchGenerator {
+  std::shared_ptr<Schema> schema;
+  DeviceAllocationType device_type;
+  AsyncGenerator<RecordBatchWithMetadata> generator;
+};
+
+namespace internal {
+class Executor;
+}
+
+/// \brief Create an AsyncRecordBatchReader and populate a corresponding 
handler to pass
+/// to a producer
+///
+/// The ArrowAsyncDeviceStreamHandler struct is intended to have its callbacks 
populated
+/// and then be passed to a producer to call the appropriate callbacks when 
data is ready.
+/// This inverts the traditional flow of control, and so we construct a 
corresponding
+/// AsyncRecordBatchReader to provide an interface for the consumer to 
retrieve data as it
+/// is pushed to the handler.
+///
+/// \param[in,out] handler C struct to be populated
+/// \param[in] executor the executor to use for waiting and populating record 
batches
+/// \param[in] queue_size initial number of record batches to request for 
queueing
+/// \param[in] mapper mapping from device type and ID to memory manager
+/// \return Future that resolves to either an error or 
AsyncRecordBatchGenerator once a
+/// schema is available or an error is received.
+ARROW_EXPORT
+Future<AsyncRecordBatchGenerator> CreateAsyncDeviceStreamHandler(
+    struct ArrowAsyncDeviceStreamHandler* handler, internal::Executor* 
executor,
+    uint64_t queue_size = 5, const DeviceMemoryMapper mapper = 
DefaultDeviceMemoryMapper);

Review Comment:
   ```suggestion
       uint64_t queue_size = 5, const DeviceMemoryMapper& mapper = 
DefaultDeviceMemoryMapper);
   ```
   
   Most functions seem to declare it this way



##########
cpp/src/arrow/c/bridge.h:
##########
@@ -406,4 +407,75 @@ Result<std::shared_ptr<ChunkedArray>> 
ImportDeviceChunkedArray(
 
 /// @}
 
+/// \defgroup c-async-stream-interface Functions for working with the async C 
data
+/// interface.
+///
+/// @{
+
+class AsyncErrorDetail : public StatusDetail {

Review Comment:
   nit: docstring? (at least on the fields; presumably code is supposed to be 
an errno?)



##########
cpp/src/arrow/c/bridge_test.cc:
##########


Review Comment:
   Can we add a test that also propagates errors? (Both: before the schema, and 
after the schema)



##########
cpp/src/arrow/record_batch.h:
##########
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <chrono>

Review Comment:
   Did you mean to add this here?



##########
cpp/src/arrow/c/bridge.h:
##########
@@ -406,4 +407,75 @@ Result<std::shared_ptr<ChunkedArray>> 
ImportDeviceChunkedArray(
 
 /// @}
 
+/// \defgroup c-async-stream-interface Functions for working with the async C 
data
+/// interface.
+///
+/// @{
+
+class AsyncErrorDetail : public StatusDetail {
+ public:
+  AsyncErrorDetail(int code, std::string message, std::string metadata)
+      : code_(code), message_(std::move(message)), 
metadata_(std::move(metadata)) {}
+  const char* type_id() const override { return "AsyncErrorDetail"; }
+  std::string ToString() const override { return message_; }
+  int code() const { return code_; }
+  const std::string& ErrorMetadata() const { return metadata_; }
+
+ private:
+  int code_{0};
+  std::string message_;
+  std::string metadata_;
+};
+
+struct AsyncRecordBatchGenerator {
+  std::shared_ptr<Schema> schema;
+  DeviceAllocationType device_type;
+  AsyncGenerator<RecordBatchWithMetadata> generator;
+};
+
+namespace internal {
+class Executor;

Review Comment:
   This really needs to not be internal at some point



##########
cpp/src/arrow/c/bridge_test.cc:
##########
@@ -5311,4 +5313,71 @@ TEST_F(TestArrayDeviceStreamRoundtrip, 
ChunkedArrayRoundtripEmpty) {
   });
 }
 
+class TestAsyncDeviceArrayStreamRoundTrip : public BaseArrayStreamTest {
+ public:
+  static Result<std::shared_ptr<ArrayData>> ToDeviceData(
+      const std::shared_ptr<MemoryManager>& mm, const ArrayData& data) {
+    arrow::BufferVector buffers;
+    for (const auto& buf : data.buffers) {
+      if (buf) {
+        ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm));
+        buffers.push_back(dest);
+      } else {
+        buffers.push_back(nullptr);
+      }
+    }
+
+    arrow::ArrayDataVector children;
+    for (const auto& child : data.child_data) {
+      ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child));
+      children.push_back(dest);
+    }
+
+    return ArrayData::Make(data.type, data.length, buffers, children, 
data.null_count,
+                           data.offset);
+  }
+
+  static Result<std::shared_ptr<Array>> ToDevice(const 
std::shared_ptr<MemoryManager>& mm,
+                                                 const ArrayData& data) {
+    ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data));
+    return MakeArray(result);
+  }
+};
+
+TEST_F(TestAsyncDeviceArrayStreamRoundTrip, Simple) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  ASSERT_OK_AND_ASSIGN(auto arr1,
+                       ToDevice(mm, *ArrayFromJSON(int32(), "[1, 
2]")->data()));
+  ASSERT_EQ(device->device_type(), arr1->device_type());
+  ASSERT_OK_AND_ASSIGN(auto arr2,
+                       ToDevice(mm, *ArrayFromJSON(int32(), "[4, 5, 
null]")->data()));
+  ASSERT_EQ(device->device_type(), arr2->device_type());
+  auto orig_schema = arrow::schema({field("ints", int32())});
+  auto batches = MakeBatches(orig_schema, {arr1, arr2});
+
+  struct ArrowAsyncDeviceStreamHandler handler;
+  auto fut_gen = CreateAsyncDeviceStreamHandler(&handler, 
internal::GetCpuThreadPool(), 1,
+                                                
TestDeviceArrayRoundtrip::DeviceMapper);
+  ASSERT_FALSE(fut_gen.is_finished());
+
+  ASSERT_OK_AND_ASSIGN(auto fut, internal::GetCpuThreadPool()->Submit([&]() {
+    return ExportAsyncRecordBatchReader(orig_schema, 
MakeVectorGenerator(batches),
+                                        device->device_type(), &handler);
+  }));
+
+  ASSERT_OK_AND_ASSIGN(auto generator, fut_gen.result());
+  AssertSchemaEqual(*orig_schema, *generator.schema);
+
+  auto collect_fut = CollectAsyncGenerator(generator.generator);
+  ASSERT_OK_AND_ASSIGN(auto results, collect_fut.result());
+  ASSERT_OK(fut.status());

Review Comment:
   nit but I believe we have specific macros for futures that also make sure to 
time out (ASSERT_FINISHES_OK etc)



##########
cpp/src/arrow/c/bridge.h:
##########
@@ -406,4 +407,75 @@ Result<std::shared_ptr<ChunkedArray>> 
ImportDeviceChunkedArray(
 
 /// @}
 
+/// \defgroup c-async-stream-interface Functions for working with the async C 
data
+/// interface.
+///
+/// @{
+
+class AsyncErrorDetail : public StatusDetail {
+ public:
+  AsyncErrorDetail(int code, std::string message, std::string metadata)
+      : code_(code), message_(std::move(message)), 
metadata_(std::move(metadata)) {}
+  const char* type_id() const override { return "AsyncErrorDetail"; }
+  std::string ToString() const override { return message_; }
+  int code() const { return code_; }
+  const std::string& ErrorMetadata() const { return metadata_; }
+
+ private:
+  int code_{0};
+  std::string message_;
+  std::string metadata_;
+};
+
+struct AsyncRecordBatchGenerator {
+  std::shared_ptr<Schema> schema;
+  DeviceAllocationType device_type;
+  AsyncGenerator<RecordBatchWithMetadata> generator;
+};
+
+namespace internal {
+class Executor;
+}
+
+/// \brief Create an AsyncRecordBatchReader and populate a corresponding 
handler to pass
+/// to a producer
+///
+/// The ArrowAsyncDeviceStreamHandler struct is intended to have its callbacks 
populated
+/// and then be passed to a producer to call the appropriate callbacks when 
data is ready.
+/// This inverts the traditional flow of control, and so we construct a 
corresponding
+/// AsyncRecordBatchReader to provide an interface for the consumer to 
retrieve data as it

Review Comment:
   ```suggestion
   /// AsyncRecordBatchGenerator to provide an interface for the consumer to 
retrieve data as it
   ```
   ?



##########
cpp/src/arrow/c/bridge_test.cc:
##########
@@ -5311,4 +5313,71 @@ TEST_F(TestArrayDeviceStreamRoundtrip, 
ChunkedArrayRoundtripEmpty) {
   });
 }
 
+class TestAsyncDeviceArrayStreamRoundTrip : public BaseArrayStreamTest {
+ public:
+  static Result<std::shared_ptr<ArrayData>> ToDeviceData(
+      const std::shared_ptr<MemoryManager>& mm, const ArrayData& data) {
+    arrow::BufferVector buffers;
+    for (const auto& buf : data.buffers) {
+      if (buf) {
+        ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm));
+        buffers.push_back(dest);
+      } else {
+        buffers.push_back(nullptr);
+      }
+    }
+
+    arrow::ArrayDataVector children;
+    for (const auto& child : data.child_data) {
+      ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child));
+      children.push_back(dest);
+    }
+
+    return ArrayData::Make(data.type, data.length, buffers, children, 
data.null_count,
+                           data.offset);
+  }
+
+  static Result<std::shared_ptr<Array>> ToDevice(const 
std::shared_ptr<MemoryManager>& mm,
+                                                 const ArrayData& data) {
+    ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data));
+    return MakeArray(result);
+  }
+};
+
+TEST_F(TestAsyncDeviceArrayStreamRoundTrip, Simple) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  ASSERT_OK_AND_ASSIGN(auto arr1,
+                       ToDevice(mm, *ArrayFromJSON(int32(), "[1, 
2]")->data()));
+  ASSERT_EQ(device->device_type(), arr1->device_type());
+  ASSERT_OK_AND_ASSIGN(auto arr2,
+                       ToDevice(mm, *ArrayFromJSON(int32(), "[4, 5, 
null]")->data()));
+  ASSERT_EQ(device->device_type(), arr2->device_type());
+  auto orig_schema = arrow::schema({field("ints", int32())});
+  auto batches = MakeBatches(orig_schema, {arr1, arr2});
+
+  struct ArrowAsyncDeviceStreamHandler handler;
+  auto fut_gen = CreateAsyncDeviceStreamHandler(&handler, 
internal::GetCpuThreadPool(), 1,
+                                                
TestDeviceArrayRoundtrip::DeviceMapper);
+  ASSERT_FALSE(fut_gen.is_finished());
+
+  ASSERT_OK_AND_ASSIGN(auto fut, internal::GetCpuThreadPool()->Submit([&]() {
+    return ExportAsyncRecordBatchReader(orig_schema, 
MakeVectorGenerator(batches),
+                                        device->device_type(), &handler);
+  }));
+
+  ASSERT_OK_AND_ASSIGN(auto generator, fut_gen.result());
+  AssertSchemaEqual(*orig_schema, *generator.schema);

Review Comment:
   ```suggestion
     ASSERT_NO_FATAL_FAILURE(AssertSchemaEqual(*orig_schema, 
*generator.schema));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to