zeroshade commented on code in PR #43632:
URL: https://github.com/apache/arrow/pull/43632#discussion_r1750452544
##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,180 @@ struct ArrowDeviceArrayStream {
#endif // ARROW_C_DEVICE_STREAM_INTERFACE
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to
+// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+struct ArrowAsyncTask {
+ // This callback should populate the ArrowDeviceArray associated with this
task.
+ // The order of ArrowAsyncTasks provided by the producer enables a consumer
to
+ // ensure the order of data to process.
+ //
+ // Returns: 0 if successful, errno-compatible error otherwise.
+ //
+ // If a non-0 value is returned then it should be followed by a call to
`on_error`
+ // on the appropriate ArrowAsyncDeviceStreamHandler.
+ //
+ // Rather than a release callback, any required cleanup should be performed
as part
+ // of the invocation of `get_data`. Ownership of the Array is passed to the
consumer
+ // calling this, and so it must be released separately.
+ //
+ // It is only valid to call this method exactly once.
+ int (*get_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);
+
+ // opaque task-specific data
+ void* private_data;
+};
+
+// ArrowAsyncProducer represents a 1-to-1 relationship between an async
producer
+// and consumer. This object allows the consumer to perform backpressure and
flow
+// control on the asynchronous stream processing. This object must be owned by
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+ // A consumer must call this function to start receiving on_next_task calls.
+ //
+ // It *must* be valid to call this synchronously from within on_next_task or
+ // on_schema, with a producer placing an upper bound on possible synchronous
+ // recursion between calls of on_next_task -> request -> on_next_task ->....
+ //
+ // After cancel has been called, additional calls to this function must be
NOPs,
+ // but allowed.
+ //
+ // While not cancelled, calling this function must register the given number
of
+ // additional arrays/batches to be produced with the producer. The producer
should
+ // only call `on_next_task` at most the registered number of arrays before
propagating
+ // backpressure.
+ //
+ // While not cancelled, calling request MAY synchronously call
`on_next_task`,
+ // `on_error`, or `release` on the ArrowAsyncDeviceStreamHandler.
+ //
+ // Any error encountered by calling request must be propagated by calling
the `on_error`
+ // callback of the ArrowAsyncDeviceStreamHandler.
+ //
+ // A producer must support an unbounded number of calls to request and must
support
+ // a total registered demand (sum requested - sum delivered) of up to
UINT64_MAX.
+ void (*request)(struct ArrowAsyncProducer* self, uint64_t n);
+
+ // This cancel callback signals a producer that it must eventually stop
making calls
+ // to on_next_task. It must be idempotent and thread-safe.
+ //
+ // After calling cancel once, subsequent calls must be NOPs.
+ //
+ // It is not required that calling cancel affect the producer immediately,
only that it
+ // must eventually stop calling on_next_task and subsequently call release
on the
+ // async handler. As such, a consumer must be prepared to receive one or
more calls to
+ // `on_next_task` even after calling cancel if there are still requested
arrays pending.
+ //
+ // Any error encountered during handling a call to cancel must be reported
via the on_error
+ // callback on the async stream handler.
+ void (*cancel)(struct ArrowAsyncProducer* self);
Review Comment:
successful cancellation should lead only to `release`. The producer should
only call `on_error` if an error occurs during processing of `cancel`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]