lidavidm commented on code in PR #43632:
URL: https://github.com/apache/arrow/pull/43632#discussion_r1755932219


##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,180 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to
+// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+struct ArrowAsyncTask {
+  // This callback should populate the ArrowDeviceArray associated with this 
task.
+  // The order of ArrowAsyncTasks provided by the producer enables a consumer 
to
+  // ensure the order of data to process.
+  //
+  // Returns: 0 if successful, errno-compatible error otherwise.
+  //
+  // If a non-0 value is returned then it should be followed by a call to 
`on_error`
+  // on the appropriate ArrowAsyncDeviceStreamHandler.

Review Comment:
   Actually, (1) do we want this to return an error at all? (2) if we do, would 
it be better to deliver it as a callback (which is potentially/probably 
entirely disconnected from the current control flow) or have the regular 
get_last_error method?



##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,180 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to
+// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+struct ArrowAsyncTask {
+  // This callback should populate the ArrowDeviceArray associated with this 
task.
+  // The order of ArrowAsyncTasks provided by the producer enables a consumer 
to
+  // ensure the order of data to process.
+  //
+  // Returns: 0 if successful, errno-compatible error otherwise.
+  //
+  // If a non-0 value is returned then it should be followed by a call to 
`on_error`
+  // on the appropriate ArrowAsyncDeviceStreamHandler.
+  //
+  // Rather than a release callback, any required cleanup should be performed 
as part
+  // of the invocation of `get_data`. Ownership of the Array is passed to the 
consumer
+  // calling this, and so it must be released separately.
+  //
+  // It is only valid to call this method exactly once.
+  int (*get_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);
+
+  // opaque task-specific data
+  void* private_data;
+};
+
+// ArrowAsyncProducer represents a 1-to-1 relationship between an async 
producer
+// and consumer. This object allows the consumer to perform backpressure and 
flow
+// control on the asynchronous stream processing. This object must be owned by 
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+  // A consumer must call this function to start receiving on_next_task calls.
+  //
+  // It *must* be valid to call this synchronously from within on_next_task or
+  // on_schema, with a producer placing an upper bound on possible synchronous
+  // recursion between calls of on_next_task -> request -> on_next_task ->....
+  //
+  // After cancel has been called, additional calls to this function must be 
NOPs,
+  // but allowed.
+  //
+  // While not cancelled, calling this function must register the given number 
of
+  // additional arrays/batches to be produced with the producer. The producer 
should
+  // only call `on_next_task` at most the registered number of arrays before 
propagating
+  // backpressure.
+  //
+  // While not cancelled, calling request MAY synchronously call 
`on_next_task`,
+  // `on_error`, or `release` on the ArrowAsyncDeviceStreamHandler.
+  //
+  // Any error encountered by calling request must be propagated by calling 
the `on_error`
+  // callback of the ArrowAsyncDeviceStreamHandler.
+  //
+  // A producer must support an unbounded number of calls to request and must 
support
+  // a total registered demand (sum requested - sum delivered) of up to 
UINT64_MAX.
+  void (*request)(struct ArrowAsyncProducer* self, uint64_t n);
+
+  // This cancel callback signals a producer that it must eventually stop 
making calls
+  // to on_next_task. It must be idempotent and thread-safe.
+  //
+  // After calling cancel once, subsequent calls must be NOPs.
+  //
+  // It is not required that calling cancel affect the producer immediately, 
only that it
+  // must eventually stop calling on_next_task and subsequently call release 
on the
+  // async handler. As such, a consumer must be prepared to receive one or 
more calls to
+  // `on_next_task` even after calling cancel if there are still requested 
arrays pending.
+  //
+  // Any error encountered during handling a call to cancel must be reported 
via the on_error
+  // callback on the async stream handler.
+  void (*cancel)(struct ArrowAsyncProducer* self);  
+
+  // producer-specific opaque data.
+  void* private_data;
+};
+
+// Similar to ArrowDeviceArrayStream, except designed for an asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+  // Handler for receiving a schema. The passed in stream_schema must be
+  // released or moved by the handler (producer is giving ownership of the 
schema to
+  // the handler, but not ownership of the top level object itself).
+  //
+  // With the exception of an error occurring (on_error), this must be the 
first
+  // callback function which is called by a producer and must only be called 
exactly
+  // once. As such, the producer should provide a valid ArrowAsyncProducer 
instance
+  // so the consumer can control the flow. See the documentation on 
ArrowAsyncProducer
+  // for how it works. The ArrowAsyncProducer is owned by the producer who 
calls this
+  // function and thus the producer is responsible for cleaning it up when 
calling
+  // the release callback of this handler.
+  //
+  // The addl_metadata argument can be null or can be used by a producer
+  // to pass arbitrary extra information to the consumer beyond the metadata 
in the schema
+  // itself (such as total number of rows, context info, or otherwise). The 
data should 
+  // be passed using the same encoding as the metadata within the ArrowSchema 
struct 
+  // itself (defined in the spec at
+  // 
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata)
+  //
+  // If addl_metadata is non-null then it only needs to exist for the lifetime 
of this call,
+  // a consumer who wants it to live after that must copy it to ensure 
lifetime.
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise
+  //
+  // A producer that receives a non-zero return here should stop producing and 
eventually
+  // call release instead.
+  int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+                   struct ArrowAsyncProducer* producer, struct ArrowSchema* 
stream_schema,
+                   const char* addl_metadata);
+
+  // Handler for receiving data. This is called when data is available 
providing an
+  // ArrowAsyncTask struct to signify it. The consumer is responsible for 
calling
+  // the release callback on the ArrowAsyncTask after retrieving the 
array/batch.
+  // An empty/released task is passed to indicate the end of the stream if no
+  // errors have been encountered.
+  //
+  // The `request` callback of a provided ArrowAsyncProducer must be called in 
order
+  // to start receiving calls to this handler.
+  //
+  // The metadata argument can be null or can be used by a producer
+  // to pass arbitrary extra information to the consumer (such as total number
+  // of rows, context info, or otherwise). The data should be passed using the 
same
+  // encoding as the metadata within the ArrowSchema struct itself (defined in
+  // the spec at
+  // 
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata)
+  //
+  // If metadata is non-null then it only needs to exist for the lifetime of 
this call,
+  // a consumer who wants it to live after that must copy it to ensure 
lifetime.
+  //
+  // A producer *must not* call this concurrently from multiple different 
threads.
+  //
+  // A consumer must be prepared to received one or more calls to this 
callback even
+  // after calling cancel on the corresponding ArrowAsyncProducer, as cancel 
does not
+  // guarantee it happens immediately.
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise.
+  int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
+                      struct ArrowAsyncTask* task, const char* metadata);

Review Comment:
   ping



##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,180 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to

Review Comment:
   (by the way, I think you need `///` for Doxygen to pick this up as a doc 
comment.)



##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,180 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to
+// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+struct ArrowAsyncTask {
+  // This callback should populate the ArrowDeviceArray associated with this 
task.
+  // The order of ArrowAsyncTasks provided by the producer enables a consumer 
to
+  // ensure the order of data to process.
+  //
+  // Returns: 0 if successful, errno-compatible error otherwise.
+  //
+  // If a non-0 value is returned then it should be followed by a call to 
`on_error`
+  // on the appropriate ArrowAsyncDeviceStreamHandler.
+  //
+  // Rather than a release callback, any required cleanup should be performed 
as part
+  // of the invocation of `get_data`. Ownership of the Array is passed to the 
consumer
+  // calling this, and so it must be released separately.
+  //
+  // It is only valid to call this method exactly once.
+  int (*get_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);
+
+  // opaque task-specific data
+  void* private_data;
+};
+
+// ArrowAsyncProducer represents a 1-to-1 relationship between an async 
producer
+// and consumer. This object allows the consumer to perform backpressure and 
flow
+// control on the asynchronous stream processing. This object must be owned by 
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+  // A consumer must call this function to start receiving on_next_task calls.
+  //
+  // It *must* be valid to call this synchronously from within on_next_task or
+  // on_schema, with a producer placing an upper bound on possible synchronous
+  // recursion between calls of on_next_task -> request -> on_next_task ->....

Review Comment:
   I'm not saying that the consumer isn't allowed to call it, just that the 
producer should schedule the task later instead of immediately recursing. It 
would also avoid reentrant callbacks.



##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,180 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to
+// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+struct ArrowAsyncTask {
+  // This callback should populate the ArrowDeviceArray associated with this 
task.

Review Comment:
   Ping



##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,180 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to
+// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+struct ArrowAsyncTask {
+  // This callback should populate the ArrowDeviceArray associated with this 
task.
+  // The order of ArrowAsyncTasks provided by the producer enables a consumer 
to
+  // ensure the order of data to process.
+  //
+  // Returns: 0 if successful, errno-compatible error otherwise.
+  //
+  // If a non-0 value is returned then it should be followed by a call to 
`on_error`
+  // on the appropriate ArrowAsyncDeviceStreamHandler.
+  //
+  // Rather than a release callback, any required cleanup should be performed 
as part
+  // of the invocation of `get_data`. Ownership of the Array is passed to the 
consumer
+  // calling this, and so it must be released separately.
+  //
+  // It is only valid to call this method exactly once.
+  int (*get_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);
+
+  // opaque task-specific data
+  void* private_data;
+};
+
+// ArrowAsyncProducer represents a 1-to-1 relationship between an async 
producer
+// and consumer. This object allows the consumer to perform backpressure and 
flow
+// control on the asynchronous stream processing. This object must be owned by 
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+  // A consumer must call this function to start receiving on_next_task calls.
+  //
+  // It *must* be valid to call this synchronously from within on_next_task or
+  // on_schema, with a producer placing an upper bound on possible synchronous
+  // recursion between calls of on_next_task -> request -> on_next_task ->....
+  //
+  // After cancel has been called, additional calls to this function must be 
NOPs,
+  // but allowed.
+  //
+  // While not cancelled, calling this function must register the given number 
of
+  // additional arrays/batches to be produced with the producer. The producer 
should
+  // only call `on_next_task` at most the registered number of arrays before 
propagating
+  // backpressure.
+  //
+  // While not cancelled, calling request MAY synchronously call 
`on_next_task`,
+  // `on_error`, or `release` on the ArrowAsyncDeviceStreamHandler.
+  //
+  // Any error encountered by calling request must be propagated by calling 
the `on_error`
+  // callback of the ArrowAsyncDeviceStreamHandler.
+  //
+  // A producer must support an unbounded number of calls to request and must 
support
+  // a total registered demand (sum requested - sum delivered) of up to 
UINT64_MAX.
+  void (*request)(struct ArrowAsyncProducer* self, uint64_t n);
+
+  // This cancel callback signals a producer that it must eventually stop 
making calls
+  // to on_next_task. It must be idempotent and thread-safe.
+  //
+  // After calling cancel once, subsequent calls must be NOPs.
+  //
+  // It is not required that calling cancel affect the producer immediately, 
only that it
+  // must eventually stop calling on_next_task and subsequently call release 
on the
+  // async handler. As such, a consumer must be prepared to receive one or 
more calls to
+  // `on_next_task` even after calling cancel if there are still requested 
arrays pending.
+  //
+  // Any error encountered during handling a call to cancel must be reported 
via the on_error
+  // callback on the async stream handler.
+  void (*cancel)(struct ArrowAsyncProducer* self);  

Review Comment:
   I don't see this explicitly documented?



##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,180 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to

Review Comment:
   Can we document the rationale for having the task vs just passing the array 
to the callback directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to