This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 40b2fca474 GH-43631: [C][Format] Add ArrowAsyncDeviceStreamHandler 
interface (#43632)
40b2fca474 is described below

commit 40b2fca4742e2692a917755fd8db2939e10fa02d
Author: Matt Topol <[email protected]>
AuthorDate: Wed Nov 6 04:47:39 2024 -0500

    GH-43631: [C][Format] Add ArrowAsyncDeviceStreamHandler interface (#43632)
    
    
    
    ### Rationale for this change
    See https://github.com/apache/arrow-adbc/issues/811 and 
https://github.com/apache/arrow/issues/43631
    
    ### What changes are included in this PR?
    Definition of `ArrowAsyncDeviceStreamHandler` and addition of it to the 
docs.
    
    I've sent an [email to the mailing 
list](https://lists.apache.org/thread/yfokmfkrmmp7tqvq0m3rshcvloq278cq) to 
start a discussion on this topic, so this may change over time due to those 
discussions.
    
    * GitHub Issue: #43631
    
    Lead-authored-by: Matt Topol <[email protected]>
    Co-authored-by: Felipe Oliveira Carvalho <[email protected]>
    Co-authored-by: Sutou Kouhei <[email protected]>
    Co-authored-by: Raúl Cumplido <[email protected]>
    Co-authored-by: Dane Pitkin <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: David Li <[email protected]>
    Co-authored-by: Ian Cook <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 cpp/src/arrow/c/abi.h                       | 206 ++++++++++++++++
 docs/source/format/CDeviceDataInterface.rst | 363 ++++++++++++++++++++++++++++
 2 files changed, 569 insertions(+)

diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h
index db051fff5f..9dc142bd08 100644
--- a/cpp/src/arrow/c/abi.h
+++ b/cpp/src/arrow/c/abi.h
@@ -228,6 +228,212 @@ struct ArrowDeviceArrayStream {
 
 #endif  // ARROW_C_DEVICE_STREAM_INTERFACE
 
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#  define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that 
was passed
+// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+//
+// The reason for this Task approach instead of the Async interface returning
+// the Array directly is to allow for more complex thread handling and reducing
+// context switching and data transfers between CPU cores (e.g. from one L1/L2
+// cache to another) if desired.
+//
+// For example, the `on_next_task` callback can be called when data is ready, 
while
+// the producer puts potential "decoding" logic in the `ArrowAsyncTask` 
object. This
+// allows for the producer to manage the I/O on one thread which calls 
`on_next_task`
+// and the consumer can determine when the decoding (producer logic in the 
`extract_data`
+// callback of the task) occurs and on which thread, to avoid a CPU core 
transfer
+// (data staying in the L2 cache).
+struct ArrowAsyncTask {
+  // This callback should populate the ArrowDeviceArray associated with this 
task.
+  // The order of ArrowAsyncTasks provided by the producer enables a consumer 
to
+  // ensure the order of data to process.
+  //
+  // This function is expected to be synchronous, but should not perform any 
blocking
+  // I/O. Ideally it should be as cheap as possible so as to not tie up the 
consumer
+  // thread unnecessarily.
+  //
+  // Returns: 0 if successful, errno-compatible error otherwise.
+  //
+  // If a non-0 value is returned then it should be followed by a call to 
`on_error`
+  // on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's 
highly
+  // likely that whatever is calling this function may be entirely 
disconnected from
+  // the current control flow. Indicating an error here with a non-zero return 
allows
+  // the current flow to be aware of the error occurring, while still allowing 
any
+  // logging or error handling to still be centralized in the `on_error` 
callback of
+  // the original Async handler.
+  //
+  // Rather than a release callback, any required cleanup should be performed 
as part
+  // of the invocation of `extract_data`. Ownership of the Array is passed to 
the consumer
+  // calling this, and so it must be released separately.
+  //
+  // It is only valid to call this method exactly once.
+  int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* 
out);
+
+  // opaque task-specific data
+  void* private_data;
+};
+
+// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between 
an async
+// producer and consumer. This object allows the consumer to perform 
backpressure and flow
+// control on the asynchronous stream processing. This object must be owned by 
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+  // A consumer must call this function to start receiving on_next_task calls.
+  //
+  // It *must* be valid to call this synchronously from within `on_next_task` 
or
+  // `on_schema`, but this function *must not* immediately call `on_next_task` 
so as
+  // to avoid recursion and reentrant callbacks.
+  //
+  // After cancel has been called, additional calls to this function must be 
NOPs,
+  // but allowed. While not cancelled, calling this function must register the
+  // given number of additional arrays/batches to be produced with the 
producer.
+  // The producer should only call `on_next_task` at most the registered number
+  // of arrays before propagating backpressure.
+  //
+  // Any error encountered by calling request must be propagated by calling 
the `on_error`
+  // callback of the ArrowAsyncDeviceStreamHandler.
+  //
+  // While not cancelled, any subsequent calls to `on_next_task`, `on_error` or
+  // `release` should be scheduled by the producer to be called later.
+  //
+  // It is invalid for a consumer to call this with a value of n <= 0, 
producers should
+  // error if given such a value.
+  void (*request)(struct ArrowAsyncProducer* self, int64_t n);
+
+  // This cancel callback signals a producer that it must eventually stop 
making calls
+  // to on_next_task. It must be idempotent and thread-safe. After calling 
cancel once,
+  // subsequent calls must be NOPs. This must not call any consumer-side 
handlers other
+  // than `on_error`.
+  //
+  // It is not required that calling cancel affect the producer immediately, 
only that it
+  // must eventually stop calling on_next_task and subsequently call release 
on the
+  // async handler. As such, a consumer must be prepared to receive one or 
more calls to
+  // `on_next_task` even after calling cancel if there are still requested 
arrays pending.
+  //
+  // Successful cancellation should *not* result in the producer calling 
`on_error`, it
+  // should finish out any remaining tasks and eventually call `release`.
+  //
+  // Any error encountered during handling a call to cancel must be reported 
via the
+  // on_error callback on the async stream handler.
+  void (*cancel)(struct ArrowAsyncProducer* self);
+
+  // Any additional metadata tied to a specific stream of data. This must 
either be NULL
+  // or a valid pointer to metadata which is encoded in the same way schema 
metadata
+  // would be. Non-null metadata must be valid for the lifetime of this 
object. As an
+  // example a producer could use this to provide the total number of rows 
and/or batches
+  // in the stream if known.
+  const char* additional_metadata;
+
+  // producer-specific opaque data.
+  void* private_data;
+};
+
+// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an 
asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+  // Handler for receiving a schema. The passed in stream_schema must be
+  // released or moved by the handler (producer is giving ownership of the 
schema to
+  // the handler, but not ownership of the top level object itself).
+  //
+  // With the exception of an error occurring (on_error), this must be the 
first
+  // callback function which is called by a producer and must only be called 
exactly
+  // once. As such, the producer should provide a valid ArrowAsyncProducer 
instance
+  // so the consumer can control the flow. See the documentation on 
ArrowAsyncProducer
+  // for how it works. The ArrowAsyncProducer is owned by the producer who 
calls this
+  // function and thus the producer is responsible for cleaning it up when 
calling
+  // the release callback of this handler.
+  //
+  // If there is any additional metadata tied to this stream, it will be 
provided as
+  // a non-null value for the `additional_metadata` field of the 
ArrowAsyncProducer
+  // which will be valid at least until the release callback is called.
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise
+  //
+  // A producer that receives a non-zero return here should stop producing and 
eventually
+  // call release instead.
+  int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+                   struct ArrowSchema* stream_schema);
+
+  // Handler for receiving data. This is called when data is available 
providing an
+  // ArrowAsyncTask struct to signify it. The producer indicates the end of 
the stream
+  // by passing NULL as the value for the task rather than a valid pointer to 
a task.
+  // The task object is only valid for the lifetime of this function call, if 
a consumer
+  // wants to utilize it after this function returns, it must copy or move the 
contents
+  // of it to a new ArrowAsyncTask object.
+  //
+  // The `request` callback of a provided ArrowAsyncProducer must be called in 
order
+  // to start receiving calls to this handler.
+  //
+  // The metadata argument can be null or can be used by a producer
+  // to pass arbitrary extra information to the consumer (such as total number
+  // of rows, context info, or otherwise). The data should be passed using the 
same
+  // encoding as the metadata within the ArrowSchema struct itself (defined in
+  // the spec at
+  // 
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata)
+  //
+  // If metadata is non-null then it only needs to exist for the lifetime of 
this call,
+  // a consumer who wants it to live after that must copy it to ensure 
lifetime.
+  //
+  // A producer *must not* call this concurrently from multiple different 
threads.
+  //
+  // A consumer must be prepared to receive one or more calls to this callback 
even
+  // after calling cancel on the corresponding ArrowAsyncProducer, as cancel 
does not
+  // guarantee it happens immediately.
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise.
+  //
+  // If the consumer returns a non-zero return from this method, that 
indicates to the
+  // producer that it should stop propagating data as an error occurred. After 
receiving
+  // such a return, the only interaction with this object is for the producer 
to call
+  // the `release` callback.
+  int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
+                      struct ArrowAsyncTask* task, const char* metadata);
+
+  // Handler for encountering an error. The producer should call release after
+  // this returns to clean up any resources. The `code` passed in can be any 
error
+  // code that a producer wants, but should be errno-compatible for 
consistency.
+  //
+  // If the message or metadata are non-null, they will only last as long as 
this
+  // function call. The consumer would need to perform a copy of the data if 
it is
+  // necessary for them to live past the lifetime of this call.
+  //
+  // Error metadata should be encoded as with metadata in ArrowSchema, defined 
in
+  // the spec at
+  // 
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata
+  //
+  // It is valid for this to be called by a producer with or without a 
preceding call
+  // to ArrowAsyncProducer.request.
+  //
+  // This callback must not call any methods of an ArrowAsyncProducer object.
+  void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code,
+                   const char* message, const char* metadata);
+
+  // Release callback to release any resources for the handler. Should always 
be
+  // called by a producer when it is done utilizing a handler. No callbacks 
should
+  // be called after this is called.
+  //
+  // It is valid for the release callback to be called by a producer with or 
without
+  // a preceding call to ArrowAsyncProducer.request.
+  //
+  // The release callback must not call any methods of an ArrowAsyncProducer 
object.
+  void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
+
+  // MUST be populated by the producer BEFORE calling any callbacks other than 
release.
+  // This provides the connection between a handler and its producer, and must 
exist until
+  // the release callback is called.
+  struct ArrowAsyncProducer* producer;
+
+  // Opaque handler-specific data
+  void* private_data;
+};
+
+#endif  // ARROW_C_ASYNC_STREAM_INTERFACE
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/docs/source/format/CDeviceDataInterface.rst 
b/docs/source/format/CDeviceDataInterface.rst
index 59433bae47..fbb2012c30 100644
--- a/docs/source/format/CDeviceDataInterface.rst
+++ b/docs/source/format/CDeviceDataInterface.rst
@@ -506,6 +506,8 @@ could be used for any device:
         arr->array.release(&arr->array);
     }
 
+.. _c-device-stream-interface:
+
 Device Stream Interface
 =======================
 
@@ -650,6 +652,367 @@ The stream source is not assumed to be thread-safe. 
Consumers wanting to
 call ``get_next`` from several threads should ensure those calls are
 serialized.
 
+Async Device Stream Interface
+=============================
+
+.. warning::
+
+    Experimental: The Async C Device Stream interface is experimental in its 
current
+    form. Based on feedback and usage the protocol definition may change until
+    it is fully standardized.
+
+The :ref:`C stream interface <c-device-stream-interface>` provides a 
synchronous
+API centered around the consumer calling the producer functions to retrieve
+the next record batch. For concurrent communication between producer and 
consumer,
+the ``ArrowAsyncDeviceStreamHandler`` can be used. This interface is 
non-opinionated
+and may fit into different concurrent communication models.
+
+Semantics
+---------
+
+Rather than the producer providing a structure of callbacks for a consumer to
+call and retrieve records, the Async interface is a structure allocated and 
populated by the consumer.
+The consumer allocated struct provides handler callbacks for the producer to 
call
+when the schema and chunks of data are available.
+
+In addition to the ``ArrowAsyncDeviceStreamHandler``, there are also two 
additional
+structs used for the full data flow: ``ArrowAsyncTask`` and 
``ArrowAsyncProducer``.
+
+Structure Definition
+--------------------
+
+The C device async stream interface consists of three ``struct`` definitions:
+
+.. code-block:: c
+
+    #ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+    #define ARROW_C_ASYNC_STREAM_INTERFACE
+
+    struct ArrowAsyncTask {
+      int (*extract_data)(struct ArrowArrayTask* self, struct 
ArrowDeviceArray* out);
+
+      void* private_data;
+    };
+
+    struct ArrowAsyncProducer {
+      void (*request)(struct ArrowAsyncProducer* self, int64_t n);
+      void (*cancel)(struct ArrowAsyncProducer* self);
+
+      void (*release)(struct ArrowAsyncProducer* self);
+      const char* additional_metadata;
+      void* private_data;
+    };
+
+    struct ArrowAsyncDeviceStreamHandler {
+      // consumer-specific handlers
+      int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+                       struct ArrowSchema* stream_schema);
+      int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
+                          struct ArrowAsyncTask* task, const char* metadata);
+      void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self,
+                       int code, const char* message, const char* metadata);
+
+      // release callback
+      void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
+
+      // must be populated before calling any callbacks
+      struct ArrowAsyncProducer* producer;
+
+      // opaque handler-specific data
+      void* private_data;
+    };
+
+    #endif  // ARROW_C_ASYNC_STREAM_INTERFACE
+
+.. note::
+    The canonical guard ``ARROW_C_ASYNC_STREAM_INTERFACE`` is meant to avoid
+    duplicate definitions if two projects copy the C async stream interface
+    definitions into their own headers, and a third-party project includes
+    from these two projects. It is therefore important that this guard is kept
+    exactly as-is when these definitions are copied.
+
+The ArrowAsyncDeviceStreamHandler structure
+'''''''''''''''''''''''''''''''''''''''''''
+
+The structure has the following fields:
+
+.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct 
ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*)
+
+    *Mandatory.* Handler for receiving the schema of the stream. All incoming 
records should
+    match the provided schema. If successful, the function should return 0, 
otherwise
+    it should return an ``errno``-compatible error code.
+
+    If there is any extra contextual information that the producer wants to 
provide, it can set
+    :c:member:`ArrowAsyncProducer.additional_metadata` to a non-NULL value. 
This is encoded in the
+    same format as :c:member:`ArrowSchema.metadata`. The lifetime of this 
metadata, if not ``NULL``,
+    should be tied to the lifetime of the ``ArrowAsyncProducer`` object.
+
+    Unless the ``on_error`` handler is called, this will always get called 
exactly once and will be
+    the first method called on this object. As such the producer *MUST* 
populate the ``ArrowAsyncProducer``
+    member before calling this function to allow the consumer to apply 
back-pressure and control the flow of data.
+    The producer maintains ownership of the ``ArrowAsyncProducer`` and must 
clean it up *after*
+    calling the release callback on the ``ArrowAsyncDeviceStreamHandler``.
+
+    A producer that receives a non-zero result here must not subsequently call 
anything other than
+    the release callback on this object.
+
+.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_next_task)(struct 
ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncTask*, const char*)
+
+    *Mandatory.* Handler to be called when a new record is available for 
processing. The
+    schema for each record should be the same as the schema that ``on_schema`` 
was called with.
+    If successfully handled, the function should return 0, otherwise it should 
return an
+    ``errno``-compatible error code.
+
+    Rather than passing the record itself it receives an ``ArrowAsyncTask`` 
instead to facilitate
+    better consumer-focused thread control as far as receiving the data. A 
call to this function
+    simply indicates that data is available via the provided task.
+
+    The producer signals the end of the stream by passing ``NULL`` for the 
``ArrowAsyncTask``
+    pointer instead of a valid address. This task object is only valid during 
the lifetime of
+    this function call. If the consumer wants to use the task beyond the scope 
of this method, it
+    must copy or move its contents to a new ArrowAsyncTask object.
+
+    The ``const char*`` parameter exists for producers to provide any extra 
contextual information
+    they want. This is encoded in the same format as 
:c:member:`ArrowSchema.metadata`. If not ``NULL``,
+    the lifetime is only the scope of the call to this function. A consumer 
who wants to maintain
+    the additional metadata beyond the lifetime of this call *MUST* copy the 
value themselves.
+
+    A producer *MUST NOT* call this concurrently from multiple threads.
+
+    The :c:member:`ArrowAsyncProducer.request` callback must be called to 
start receiving calls to this
+    handler.
+
+.. c:member:: void (*ArrowAsyncDeviceStreamHandler.on_error)(struct 
ArrowAsyncDeviceStreamHandler, int, const char*, const char*)
+
+    *Mandatory.* Handler to be called when an error is encountered by the 
producer. After calling
+    this, the ``release`` callback will be called as the last call on this 
struct. The parameters
+    are an ``errno``-compatible error code and an optional error message and 
metadata.
+
+    If the message and metadata are not ``NULL``, their lifetime is only valid 
during the scope
+    of this call. A consumer who wants to maintain these values past the 
return of this function
+    *MUST* copy the values themselves.
+
+    If the metadata parameter is not ``NULL``, to provide key-value error 
metadata, then it should
+    be encoded identically to the way that metadata is encoded in 
:c:member:`ArrowSchema.metadata`.
+
+    It is valid for this to be called by a producer with or without a 
preceding call to
+    :c:member:`ArrowAsyncProducer.request`. This callback *MUST NOT* call any 
methods of an
+    ``ArrowAsyncProducer`` object.
+
+.. c:member:: void (*ArrowAsyncDeviceStreamHandler.release)(struct 
ArrowAsyncDeviceStreamHandler*)
+
+    *Mandatory.* A pointer to a consumer-provided release callback for the 
handler.
+
+    It is valid for this to be called by a producer with or without a 
preceding call to
+    :c:member:`ArrowAsyncProducer.request`. This must not call any methods of 
an ``ArrowAsyncProducer``
+    object.
+
+.. c:member:: struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer
+
+    *Mandatory.* The producer object that the consumer will use to request 
additional data or cancel.
+
+    This object *MUST* be populated by the producer before calling the 
:c:member:`ArrowAsyncDeviceStreamHandler.on_schema`
+    callback. The producer maintains ownership of this object and must clean 
it up *after* calling
+    the release callback on the ``ArrowAsyncDeviceStreamHandler``.
+
+    The consumer *CANNOT* assume that this is valid until the ``on_schema`` 
callback is called.
+
+.. c:member:: void* ArrowAsyncDeviceStreamHandler.private_data
+
+    *Optional.* An opaque pointer to consumer-provided private data.
+
+    Producers *MUST NOT* process this member. Lifetime of this member is 
handled by
+    the consumer, and especially by the release callback.
+
+The ArrowAsyncTask structure
+''''''''''''''''''''''''''''
+
+The purpose of using a Task object rather than passing the array directly to 
the ``on_next``
+callback is to allow for more complex and efficient thread handling. Utilizing 
a Task
+object allows for a producer to separate the "decoding" logic from the I/O, 
enabling a
+consumer to avoid transferring data between CPU cores (e.g. from one L1/L2 
cache to another).
+
+This producer-provided structure has the following fields:
+
+.. c:member:: int (*ArrowArrayTask.extract_data)(struct ArrowArrayTask*, 
struct ArrowDeviceArray*)
+
+  *Mandatory.* A callback to populate the provided ``ArrowDeviceArray`` with 
the available data.
+  The order of ``ArrowAsyncTasks`` provided by the producer enables a consumer 
to know the order of
+  the data to process. If the consumer does not care about the data that is 
owned by this task,
+  it must still call ``extract_data`` so that the producer can perform any 
required cleanup. ``NULL``
+  should be passed as the device array pointer to indicate that the consumer 
doesn't want the
+  actual data, letting the task perform necessary cleanup.
+
+  If a non-zero value is returned from this, it should be followed only by the 
producer calling
+  the ``on_error`` callback of the ``ArrowAsyncDeviceStreamHandler``. Because 
calling this method
+  is likely to be separate from the current control flow, returning a non-zero 
value to signal
+  an error occuring allows the current thread to decide handle the case 
accordingly, while still
+  allowing all error logging and handling to be centralized in the
+  :c:member:`ArrowAsyncDeviceStreamHandler.on_error` callback.
+
+  Rather than having a separate release callback, any required cleanup should 
be performed as part
+  of the invocation of this callback. Ownership of the Array is given to the 
pointer passed in as
+  a parameter, and this array must be released separately.
+
+  It is only valid to call this method exactly once.
+
+.. c:member:: void* ArrowArrayTask.private_data
+
+  *Optional.* An opaque pointer to producer-provided private data.
+
+  Consumers *MUST NOT* process this member. Lifetime of this member is handled 
by
+  the producer who created this object, and should be cleaned up if necessary 
during
+  the call to :c:member:`ArrowArrayTask.extract_data`.
+
+The ArrowAsyncProducer structure
+''''''''''''''''''''''''''''''''
+
+This producer-provided and managed object has the following fields:
+
+.. c:member:: void (*ArrowAsyncProducer.request)(struct ArrowAsyncProducer*, 
uint64_t)
+
+  *Mandatory.* This function must be called by a consumer to start receiving 
calls to
+  :c:member:`ArrowAsyncDeviceStreamHandler.on_next_task`. It *MUST* be valid 
to call
+  this synchronously from within 
:c:member:`ArrowAsyncDeviceStreamHandler.on_next_task`
+  or :c:member:`ArrowAsyncDeviceStreamHandler.on_schema`. As a result, this 
function
+  *MUST NOT* synchronously call ``on_next_task`` or ``on_error`` to avoid 
recursive
+  and reentrant callbacks.
+
+  After ``cancel`` is called, additional calls to this function must be a NOP, 
but allowed.
+
+  While not cancelled, calling this function registers the given number of 
additional
+  arrays/batches to be produced by the producer. A producer should only call
+  the appropriate ``on_next_task`` callback up to a maximum of the total sum 
of calls to
+  this method before propagating back-pressure / waiting.
+
+  Any error encountered by calling request must be propagated by calling the 
``on_error``
+  callback of the ``ArrowAsyncDeviceStreamHandler``.
+
+  It is invalid to call this function with a value of ``n`` that is ``<= 0``. 
Producers should
+  error (e.g. call ``on_error``) if receiving such a value for ``n``.
+
+.. c:member:: void (*ArrowAsyncProducer.cancel)(struct ArrowAsyncProducer*)
+
+  *Mandatory.* This function signals to the producer that it must *eventually* 
stop calling
+  ``on_next_task``. Calls to ``cancel`` must be idempotent and thread-safe. 
After calling
+  it once, subsequent calls *MUST* be a NOP. This *MUST NOT* call any 
consumer-side handlers
+  other than ``on_error``.
+
+  It is not required that calling ``cancel`` affect the producer 
*immediately*, only that it
+  must eventually stop calling ``on_next_task`` and then subsequently call 
``release``
+  on the async handler object. As such, a consumer *MUST* be prepared to 
receive one or more
+  calls to ``on_next_task`` or ``on_error`` even after calling ``cancel`` if 
there are still
+  requested arrays pending.
+
+  Successful cancelling *MUST NOT* result in a producer calling
+  :c:member:`ArrowAsyncDeviceStreamHandler.on_error`, instead it should finish 
out any remaining
+  tasks (calling ``on_next_task`` accordingly) and eventually just call 
``release``.
+
+  Any error encountered during handling a call to cancel must be reported via 
the ``on_error``
+  callback on the async stream handler.
+
+.. c:member:: const char* ArrowAsyncProducer.additional_metadata
+
+    *Optional.* An additional metadata string to provide any extra context to 
the consumer. This *MUST*
+    either be ``NULL`` or a valid string that is encoded in the same way as 
:c:member:`ArrowSchema.metadata`.
+    As an example, a producer could utilize this metadata to provide the total 
number of rows and/or batches
+    in the stream if known.
+
+    If not ``NULL`` it *MUST* be valid for at least the lifetime of this 
object.
+
+.. c:member:: void* ArrowAsyncProducer.private_data
+
+  *Optional.* An opaque pointer to producer-provided specific data.
+
+  Consumers *MUST NOT* process this member, the lifetime is owned by the 
producer
+  that constructed this object.
+
+Error Handling
+''''''''''''''
+
+Unlike the regular C Stream interface, the Async interface allows for errors 
to flow in
+both directions. As a result, error handling can be slightly more complex. 
Thus this spec
+designates the following rules:
+
+* If the producer encounters an error during processing, it should call the 
``on_error``
+  callback, and then call ``release`` after it returns.
+
+* If ``on_schema`` or ``on_next_task`` returns a non-zero integer value, the 
producer *should not*
+  call the ``on_error`` callback, but instead should eventually call 
``release`` at some point
+  before or after any logging or processing of the error code.
+
+Result lifetimes
+''''''''''''''''
+
+The ``ArrowSchema`` passed to the ``on_schema`` callback must be released 
independently,
+with the object itself needing to be moved to a consumer owned ``ArrowSchema`` 
object. The
+``ArrowSchema*`` passed as a parameter to the callback *MUST NOT* be stored 
and kept.
+
+The ``ArrowAsyncTask`` object provided to ``on_next_task`` is owned by the 
producer and
+will be cleaned up during the invocation of calling ``extract_data`` on it. If 
the consumer
+doesn't care about the data, it should pass ``NULL`` instead of a valid 
``ArrowDeviceArray*``.
+
+The ``const char*`` error ``message`` and ``metadata`` which are passed to 
``on_error``
+are only valid within the scope of the ``on_error`` function itself. They must 
be copied
+if it is necessary for them to exist after it returns.
+
+Stream Handler Lifetime
+'''''''''''''''''''''''
+
+Lifetime of the async stream handler is managed using a release callback with 
similar
+usage as in :ref:`C data interface <c-data-interface-released>`.
+
+ArrowAsyncProducer Lifetime
+'''''''''''''''''''''''''''
+
+The lifetime of the ``ArrowAsyncProducer`` is owned by the producer itself and 
should
+be managed by it. It *MUST* be populated before calling any methods other than 
``release``
+and *MUST* remain valid at least until just before calling ``release`` on the 
stream handler object.
+
+Thread safety
+'''''''''''''
+
+All handler functions on the ``ArrowAsyncDeviceStreamHandler`` should only be 
called in a
+serialized manner, but are not guaranteed to be called from the same thread 
every time. A
+producer should wait for handler callbacks to return before calling the next 
handler callback,
+and before calling the ``release`` callback.
+
+Back-pressure is managed by the consumer making calls to 
:c:member:`ArrowAsyncProducer.request`
+to indicate how many arrays it is ready to receive.
+
+The ``ArrowAsyncDeviceStreamHandler`` object should be able to handle 
callbacks as soon as
+it is passed to the producer, any initialization should be performed before it 
is provided.
+
+Possible Sequence Diagram
+-------------------------
+
+.. mermaid::
+
+  sequenceDiagram
+    Consumer->>+Producer: ArrowAsyncDeviceStreamHandler*
+    Producer-->>+Consumer: on_schema(ArrowAsyncProducer*, ArrowSchema*)
+    Consumer->>Producer: ArrowAsyncProducer->request(n)
+
+    par
+        loop up to n times
+            Producer-->>Consumer: on_next_task(ArrowAsyncTask*)
+        end
+    and for each task
+        Consumer-->>Producer: ArrowAsyncTask.extract_data(...)
+        Consumer-->>Producer: ArrowAsyncProducer->request(1)
+    end
+
+    break Optionally
+        Consumer->>-Producer: ArrowAsyncProducer->cancel()
+    end
+
+    loop possible remaining
+        Producer-->>Consumer: on_next_task(ArrowAsyncTask*)
+    end
+
+    Producer->>-Consumer: ArrowAsyncDeviceStreamHandler->release()
+
+
 Interoperability with other interchange formats
 ===============================================
 

Reply via email to