paleolimbot commented on code in PR #40807:
URL: https://github.com/apache/arrow/pull/40807#discussion_r1540286553


##########
cpp/src/arrow/c/bridge.cc:
##########
@@ -2151,53 +2188,96 @@ class ExportedArrayStream {
 
   int64_t next_batch_num() { return private_data()->batch_num_++; }
 
-  struct ArrowArrayStream* stream_;
+  StreamType* stream_;
 };
 
 }  // namespace
 
 Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
                                struct ArrowArrayStream* out) {
-  return ExportedArrayStream<RecordBatchReader>::Make(std::move(reader), out);
+  return ExportedArrayStream<RecordBatchReader, struct ArrowArrayStream,
+                             struct ArrowArray>::Make(std::move(reader), out);
 }
 
 Status ExportChunkedArray(std::shared_ptr<ChunkedArray> chunked_array,
                           struct ArrowArrayStream* out) {
-  return ExportedArrayStream<ChunkedArray>::Make(std::move(chunked_array), 
out);
+  return ExportedArrayStream<ChunkedArray, struct ArrowArrayStream,
+                             struct 
ArrowArray>::Make(std::move(chunked_array), out);
+}
+
+Status ExportDeviceRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
+                                     struct ArrowDeviceArrayStream* out) {
+  out->device_type = static_cast<ArrowDeviceType>(reader->device_type());
+  return ExportedArrayStream<RecordBatchReader, struct ArrowDeviceArrayStream,
+                             struct ArrowDeviceArray>::Make(std::move(reader), 
out);
+}
+
+Status ExportDeviceChunkedArray(std::shared_ptr<ChunkedArray> chunked_array,
+                                DeviceAllocationType device_type,
+                                struct ArrowDeviceArrayStream* out) {
+  out->device_type = static_cast<ArrowDeviceType>(device_type);
+  return ExportedArrayStream<ChunkedArray, struct ArrowDeviceArrayStream,
+                             struct 
ArrowDeviceArray>::Make(std::move(chunked_array),
+                                                            out);
 }
 
 //////////////////////////////////////////////////////////////////////////
 // C stream import
 
 namespace {
 
+template <typename StreamTraits, typename ArrayTraits>
 class ArrayStreamReader {
+ protected:
+  using StreamType = typename StreamTraits::CType;
+  using ArrayType = typename ArrayTraits::CType;
+
  public:
-  explicit ArrayStreamReader(struct ArrowArrayStream* stream) {
-    ArrowArrayStreamMove(stream, &stream_);
-    DCHECK(!ArrowArrayStreamIsReleased(&stream_));
+  explicit ArrayStreamReader(StreamType* stream,
+                             const DeviceMemoryMapper& mapper = 
DefaultDeviceMapper)
+      : mapper_{mapper} {
+    StreamTraits::MoveFunc(stream, &stream_);
+    DCHECK(!StreamTraits::IsReleasedFunc(&stream_));
   }
 
   ~ArrayStreamReader() { ReleaseStream(); }
 
   void ReleaseStream() {
-    if (!ArrowArrayStreamIsReleased(&stream_)) {
-      ArrowArrayStreamRelease(&stream_);
-    }
-    DCHECK(ArrowArrayStreamIsReleased(&stream_));
+    // all our trait release funcs check IsReleased so we don't
+    // need to repeat it here
+    StreamTraits::ReleaseFunc(&stream_);
+    DCHECK(StreamTraits::IsReleasedFunc(&stream_));
   }
 
  protected:
-  Status ReadNextArrayInternal(struct ArrowArray* array) {
-    ArrowArrayMarkReleased(array);
+  Status ReadNextArrayInternal(ArrayType* array) {
+    ArrayTraits::MarkReleased(array);
     Status status = StatusFromCError(stream_.get_next(&stream_, array));
-    if (!status.ok() && !ArrowArrayIsReleased(array)) {
-      ArrowArrayRelease(array);
+    if (!status.ok() && !ArrayTraits::IsReleasedFunc(array)) {
+      ArrayTraits::ReleaseFunc(array);
     }
 
     return status;
   }
 
+  Result<std::shared_ptr<RecordBatch>> ImportRecordBatchInternal(
+      ArrayType* array, std::shared_ptr<Schema> schema) {

Review Comment:
   Would this be possible to do this as an overload instead of an `if 
constexpr`?



##########
cpp/src/arrow/c/bridge.h:
##########
@@ -321,6 +321,37 @@ ARROW_EXPORT
 Status ExportChunkedArray(std::shared_ptr<ChunkedArray> chunked_array,
                           struct ArrowArrayStream* out);
 
+/// \brief Export C++ RecordBatchReader using the C device stream interface
+///
+/// The resulting ArrowDeviceArrayStream struct keeps the record batch reader
+/// alive until its release callback is called by the consumer. The device
+/// type is determined by calling device_type() on the RecordBatchReader.
+///
+/// \note it is assumed that the output pointer has already be zeroed out 
before
+/// calling this function.

Review Comment:
   Is this true for other export methods? I don't think this is a general 
expectation for any exporting function I've seen, although it *is* safer 
(several months ago I responded to somebody who thought they had a memory leak, 
but it turns out they were accidentally reusing an `ArrowSchema`).
   
   If it is something worth adding here, it would be sufficient that the 
structure pointed to by the output pointer is marked as released (not 
necessarily completely zeroed out).



##########
cpp/src/arrow/record_batch.h:
##########
@@ -254,6 +255,16 @@ class ARROW_EXPORT RecordBatch {
   /// \return Status
   virtual Status ValidateFull() const;
 
+  /// \brief Return a top-level sync event object for this record batch
+  ///
+  /// If all of the data for this record batch is in host memory, then this
+  /// should return null (the default impl). If the data for this batch is
+  /// on a device, then if synchronization is needed before accessing the
+  /// data the returned sync event will allow for it.
+  ///
+  /// \return null or a Device::SyncEvent
+  virtual std::shared_ptr<Device::SyncEvent> GetSyncEvent();

Review Comment:
   Is it reasonable to expect that when a member `Array` is accessed or 
returned that a user of the API will be able to walk back up to the 
`RecordBatch` and check the `SyncEvent`? (I don't have a better idea...giving 
each `Array` a copy of the shared pointer also seems like it might have 
problems).



##########
cpp/src/arrow/c/helpers.h:
##########
@@ -115,6 +147,14 @@ inline void ArrowArrayStreamMove(struct ArrowArrayStream* 
src,
   ArrowArrayStreamMarkReleased(src);
 }
 
+inline void ArrowDeviceArrayStreamMove(struct ArrowDeviceArrayStream* src,
+                                       struct ArrowDeviceArrayStream* dest) {
+  assert(dest != src);

Review Comment:
   I also see `ARROW_C_ASSERT()` and `DCHECK()`...is there a preferred one of 
these to use for consistency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to