leventov commented on code in PR #43632:
URL: https://github.com/apache/arrow/pull/43632#discussion_r1721537318


##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,65 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// Similar to ArrowDeviceArrayStream, except designed for an asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+  // Handler for receiving a schema. The passed in stream_schema should be
+  // released or moved by the handler (producer is giving ownership of it to
+  // the handler).
+  //
+  // The `extension_param` argument can be null or can be used by a producer
+  // to pass arbitrary extra information to the consumer (such as total number
+  // of rows, context info, or otherwise).
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise

Review Comment:
   In general, I would suggest that ADBC's async APIs should be compatible with 
[Reactive Streams](https://en.wikipedia.org/wiki/Reactive_Streams) (RS) 
semantics in all places except for the semantics of `request(n)` signal, which 
for Arrow shouldn't be tied to the number of calls to on_next(), but rather 
indicate the number of rows. This is to avoid the burden of reinventing the 
wheel in async model aspects which reactive streams working group already 
thought and debated long and hard about.
   
   Also, users of ADBC's async APIs in JVM and C# will be able to leverage some 
very high quality code for these platforms that implement RS semantics, such as 
 
[j.u.c.SubmissionPublisher](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html),
 
[kotlinx.coroutines.reactive](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-reactive/),
 [dotnet/reactive](https://github.com/dotnet/reactive), hopefully with minimal 
modifications, or perhaps no modifications at all (the different semantics of 
`request(n)` could perhaps piggy-back existing libs because per RS spec, 
producers are allowed to send a complete signal after calling on_next fewer 
times than was requested). 
   
   Per [RS 
1.7](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#1.7),
 it's required that producer stops on error from consumer callback:
   > Once a [terminal 
state](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#term_terminal_state)
 has been signaled (onError, onComplete) it is REQUIRED that no further signals 
occur.
   
   Re:
   
   > the docs explicitly state that if an error is returned from the callback, 
the producer should only call release after that and not call further 
callbacks. though there isn't anything to functionally prevent it.
   
   Indeed, per [RS 
2.5](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#2.5),
 consumer MUST go out of its way even to prevent buggy concurrent use by the 
producer, let alone not to call any of its methods itself concurrently with the 
producer, or ofter the producer has returned error or completion signal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to