leventov commented on code in PR #43632:
URL: https://github.com/apache/arrow/pull/43632#discussion_r1721537318
##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,65 @@ struct ArrowDeviceArrayStream {
#endif // ARROW_C_DEVICE_STREAM_INTERFACE
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// Similar to ArrowDeviceArrayStream, except designed for an asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+ // Handler for receiving a schema. The passed in stream_schema should be
+ // released or moved by the handler (producer is giving ownership of it to
+ // the handler).
+ //
+ // The `extension_param` argument can be null or can be used by a producer
+ // to pass arbitrary extra information to the consumer (such as total number
+ // of rows, context info, or otherwise).
+ //
+ // Return value: 0 if successful, `errno`-compatible error otherwise
Review Comment:
In general, I would suggest that ADBC's async APIs should be compatible with
[Reactive Streams](https://en.wikipedia.org/wiki/Reactive_Streams) (RS)
semantics in all places except for the semantics of `request(n)` signal, which
for Arrow shouldn't be tied to the number of calls to on_next(), but rather
indicate the number of rows. This is to avoid the burden of reinventing the
wheel in async model aspects which reactive streams working group already
thought and debated long and hard about.
Also, users of ADBC's async APIs in JVM and C# will be able to leverage some
very high quality code for these platforms that implement RS semantics, such as
[j.u.c.SubmissionPublisher](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html),
[kotlinx.coroutines.reactive](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-reactive/),
[dotnet/reactive](https://github.com/dotnet/reactive), hopefully with minimal
modifications, or perhaps no modifications at all (the different semantics of
`request(n)` could perhaps piggy-back existing libs because per RS spec,
producers are allowed to send a complete signal after calling on_next fewer
times than was requested).
Per [RS
1.7](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#1.7),
it's required that producer stops on error from consumer callback:
> Once a [terminal
state](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#term_terminal_state)
has been signaled (onError, onComplete) it is REQUIRED that no further signals
occur.
Re:
> the docs explicitly state that if an error is returned from the callback,
the producer should only call release after that and not call further
callbacks. though there isn't anything to functionally prevent it.
Indeed, per [RS
2.5](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#2.5),
consumer MUST go out of its way even to prevent buggy concurrent use by the
producer, let alone not to call any of its methods itself concurrently with the
producer, or ofter the producer has returned error or completion signal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]