leventov commented on code in PR #43632:
URL: https://github.com/apache/arrow/pull/43632#discussion_r1721409104


##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,65 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// Similar to ArrowDeviceArrayStream, except designed for an asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+  // Handler for receiving a schema. The passed in stream_schema should be
+  // released or moved by the handler (producer is giving ownership of it to
+  // the handler).
+  //
+  // The `extension_param` argument can be null or can be used by a producer
+  // to pass arbitrary extra information to the consumer (such as total number
+  // of rows, context info, or otherwise).
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise
+  int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+                   struct ArrowSchema* stream_schema, void* extension_param);
+
+  // Handler for receiving an array/record batch. Always called at least once
+  // unless an error is encountered (which would result in calling on_error).
+  // An empty/released array is passed to indicate the end of the stream if no
+  // errors have been encountered.
+  //
+  // The `extension_param` argument can be null or can be used by a producer
+  // to pass arbitrary extra information to the consumer.
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise.
+  int (*on_next)(struct ArrowAsyncDeviceStreamHandler* self,
+                 struct ArrowDeviceArray* next, void* extension_param);

Review Comment:
   > Very sophisticated users should be able to implement advanced concurrency 
patterns (e.g. handling out-of-order pieces of the stream).
   
   @felipecrv IMO this API the wrong level to assume out-of-order delivery. Out 
of order arrival assumes producer's concurrency. If the producer is capable to 
produce arrays concurrently, and the consumer consume them concurrently or 
out-of-order, too, it's better to simply create multiple streams and make 
multiple of consumer's threads/green threads/coroutines/async tasks consume 
them on the single producer--single consumer basis. It would be *much* less 
tricky to implement both for consumers and producers, and quite likely even be 
more performant, too, because there would be no write contention and concurrent 
queues on the producer's side. Cf. [single writer 
principle](https://mechanical-sympathy.blogspot.com/2011/09/single-writer-principle.html).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to