zeroshade commented on code in PR #43632:
URL: https://github.com/apache/arrow/pull/43632#discussion_r1771653564
##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,207 @@ struct ArrowDeviceArrayStream {
#endif // ARROW_C_DEVICE_STREAM_INTERFACE
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to
+// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+//
+// The reason for this Task approach instead of the Async interface returning
+// the Array directly is to allow for more complex thread handling and reducing
+// context switching and data transfers between CPU cores (e.g. from one L1/L2
+// cache to another) if desired.
+//
+// For example, the `on_next_task` callback can be called when data is ready,
while
+// the producer puts potential "decoding" logic in the `ArrowAsyncTask`
object. This
+// allows for the producer to manage the I/O on one thread which calls
`on_next_task`
+// and the consumer can determine when the decoding (producer logic in the
`get_data`
+// callback of the task) occurs and on which thread, to avoid a CPU core
transfer
+// (data staying in the L2 cache).
+struct ArrowAsyncTask {
+ // This callback should populate the ArrowDeviceArray associated with this
task.
+ // The order of ArrowAsyncTasks provided by the producer enables a consumer
to
+ // ensure the order of data to process.
+ //
+ // This function is expected to be synchronous, but should not perform any
blocking
+ // I/O. Ideally it should be as cheap as possible so as to not tie up the
consumer
+ // thread unnecessarily.
+ //
+ // Returns: 0 if successful, errno-compatible error otherwise.
+ //
+ // If a non-0 value is returned then it should be followed by a call to
`on_error`
+ // on the appropriate ArrowAsyncDeviceStreamHandler. This is because the
it's highly
+ // likely that whatever is calling this function may be entirely
disconnected from
+ // the current control flow. Indicating an error here with a non-zero return
allows
+ // the current flow to be aware of the error occurring, while still allowing
any
+ // logging or error handling to still be centralized in the `on_error`
callback of
+ // the original Async handler.
+ //
+ // Rather than a release callback, any required cleanup should be performed
as part
+ // of the invocation of `get_data`. Ownership of the Array is passed to the
consumer
+ // calling this, and so it must be released separately.
+ //
+ // It is only valid to call this method exactly once.
+ int (*get_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);
+
+ // opaque task-specific data
+ void* private_data;
+};
+
+// ArrowAsyncProducer represents a 1-to-1 relationship between an async
producer
+// and consumer. This object allows the consumer to perform backpressure and
flow
+// control on the asynchronous stream processing. This object must be owned by
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+ // A consumer must call this function to start receiving on_next_task calls.
+ //
+ // It *must* be valid to call this synchronously from within `on_next_task`
or
+ // `on_schema`, but this function *must not* immediately call `on_next_task`
so as
+ // to avoid recursion and reentrant callbacks.
+ //
+ // After cancel has been called, additional calls to this function must be
NOPs,
+ // but allowed. While not cancelled, calling this function must register the
+ // given number of additional arrays/batches to be produced with the
producer.
+ // The producer should only call `on_next_task` at most the registered number
+ // of arrays before propagating backpressure.
+ //
+ // Any error encountered by calling request must be propagated by calling
the `on_error`
+ // callback of the ArrowAsyncDeviceStreamHandler.
+ //
+ // While not cancelled, any subsequent calls to `on_next_task`, `on_error` or
+ // `release` should be scheduled by the producer to be called later rather
+ //
+ // It is invalid for a consumer to call this with a value of n <= 0,
producers should
+ // error if given such a value.
+ void (*request)(struct ArrowAsyncProducer* self, int64_t n);
+
+ // This cancel callback signals a producer that it must eventually stop
making calls
+ // to on_next_task. It must be idempotent and thread-safe. After calling
cancel once,
+ // subsequent calls must be NOPs. This must not call any consumer-side
handlers other
+ // than `on_error`.
+ //
+ // It is not required that calling cancel affect the producer immediately,
only that it
+ // must eventually stop calling on_next_task and subsequently call release
on the
+ // async handler. As such, a consumer must be prepared to receive one or
more calls to
+ // `on_next_task` even after calling cancel if there are still requested
arrays pending.
+ //
+ // Successful cancellation should *not* result in the producer calling
`on_error`, it
+ // should finish out any remaining tasks and eventually call `release`.
+ //
+ // Any error encountered during handling a call to cancel must be reported
via the
+ // on_error callback on the async stream handler.
+ void (*cancel)(struct ArrowAsyncProducer* self);
+
+ // producer-specific opaque data.
+ void* private_data;
+};
+
+// Similar to ArrowDeviceArrayStream, except designed for an asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+ // Handler for receiving a schema. The passed in stream_schema must be
+ // released or moved by the handler (producer is giving ownership of the
schema to
+ // the handler, but not ownership of the top level object itself).
+ //
+ // With the exception of an error occurring (on_error), this must be the
first
+ // callback function which is called by a producer and must only be called
exactly
+ // once. As such, the producer should provide a valid ArrowAsyncProducer
instance
+ // so the consumer can control the flow. See the documentation on
ArrowAsyncProducer
+ // for how it works. The ArrowAsyncProducer is owned by the producer who
calls this
+ // function and thus the producer is responsible for cleaning it up when
calling
+ // the release callback of this handler.
+ //
+ // The addl_metadata argument can be null or can be used by a producer
+ // to pass arbitrary extra information to the consumer beyond the metadata
in the schema
+ // itself (such as total number of rows, context info, or otherwise). The
data should
+ // be passed using the same encoding as the metadata within the ArrowSchema
struct
+ // itself (defined in the spec at
+ //
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata)
+ //
+ // If addl_metadata is non-null then it only needs to exist for the lifetime
of this
+ // call, a consumer who wants it to live after that must copy it to ensure
lifetime.
+ //
+ // Return value: 0 if successful, `errno`-compatible error otherwise
+ //
+ // A producer that receives a non-zero return here should stop producing and
eventually
+ // call release instead.
+ int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+ struct ArrowAsyncProducer* producer, struct ArrowSchema*
stream_schema,
+ const char* addl_metadata);
Review Comment:
I added the `addl_` prefix here to indicate that it is separate from the
metadata that would exist in the `ArrowSchema` object. I didn't want anyone to
start throwing metadata in here that should live in the schema itself. But if
we don't think that's a concern I can rename it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]