This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 40b2fca474 GH-43631: [C][Format] Add ArrowAsyncDeviceStreamHandler
interface (#43632)
40b2fca474 is described below
commit 40b2fca4742e2692a917755fd8db2939e10fa02d
Author: Matt Topol <[email protected]>
AuthorDate: Wed Nov 6 04:47:39 2024 -0500
GH-43631: [C][Format] Add ArrowAsyncDeviceStreamHandler interface (#43632)
### Rationale for this change
See https://github.com/apache/arrow-adbc/issues/811 and
https://github.com/apache/arrow/issues/43631
### What changes are included in this PR?
Definition of `ArrowAsyncDeviceStreamHandler` and addition of it to the
docs.
I've sent an [email to the mailing
list](https://lists.apache.org/thread/yfokmfkrmmp7tqvq0m3rshcvloq278cq) to
start a discussion on this topic, so this may change over time due to those
discussions.
* GitHub Issue: #43631
Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: Raúl Cumplido <[email protected]>
Co-authored-by: Dane Pitkin <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: David Li <[email protected]>
Co-authored-by: Ian Cook <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
cpp/src/arrow/c/abi.h | 206 ++++++++++++++++
docs/source/format/CDeviceDataInterface.rst | 363 ++++++++++++++++++++++++++++
2 files changed, 569 insertions(+)
diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h
index db051fff5f..9dc142bd08 100644
--- a/cpp/src/arrow/c/abi.h
+++ b/cpp/src/arrow/c/abi.h
@@ -228,6 +228,212 @@ struct ArrowDeviceArrayStream {
#endif // ARROW_C_DEVICE_STREAM_INTERFACE
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+# define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that
was passed
+// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+//
+// The reason for this Task approach instead of the Async interface returning
+// the Array directly is to allow for more complex thread handling and reducing
+// context switching and data transfers between CPU cores (e.g. from one L1/L2
+// cache to another) if desired.
+//
+// For example, the `on_next_task` callback can be called when data is ready,
while
+// the producer puts potential "decoding" logic in the `ArrowAsyncTask`
object. This
+// allows for the producer to manage the I/O on one thread which calls
`on_next_task`
+// and the consumer can determine when the decoding (producer logic in the
`extract_data`
+// callback of the task) occurs and on which thread, to avoid a CPU core
transfer
+// (data staying in the L2 cache).
+struct ArrowAsyncTask {
+ // This callback should populate the ArrowDeviceArray associated with this
task.
+ // The order of ArrowAsyncTasks provided by the producer enables a consumer
to
+ // ensure the order of data to process.
+ //
+ // This function is expected to be synchronous, but should not perform any
blocking
+ // I/O. Ideally it should be as cheap as possible so as to not tie up the
consumer
+ // thread unnecessarily.
+ //
+ // Returns: 0 if successful, errno-compatible error otherwise.
+ //
+ // If a non-0 value is returned then it should be followed by a call to
`on_error`
+ // on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's
highly
+ // likely that whatever is calling this function may be entirely
disconnected from
+ // the current control flow. Indicating an error here with a non-zero return
allows
+ // the current flow to be aware of the error occurring, while still allowing
any
+ // logging or error handling to still be centralized in the `on_error`
callback of
+ // the original Async handler.
+ //
+ // Rather than a release callback, any required cleanup should be performed
as part
+ // of the invocation of `extract_data`. Ownership of the Array is passed to
the consumer
+ // calling this, and so it must be released separately.
+ //
+ // It is only valid to call this method exactly once.
+ int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray*
out);
+
+ // opaque task-specific data
+ void* private_data;
+};
+
+// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between
an async
+// producer and consumer. This object allows the consumer to perform
backpressure and flow
+// control on the asynchronous stream processing. This object must be owned by
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+ // A consumer must call this function to start receiving on_next_task calls.
+ //
+ // It *must* be valid to call this synchronously from within `on_next_task`
or
+ // `on_schema`, but this function *must not* immediately call `on_next_task`
so as
+ // to avoid recursion and reentrant callbacks.
+ //
+ // After cancel has been called, additional calls to this function must be
NOPs,
+ // but allowed. While not cancelled, calling this function must register the
+ // given number of additional arrays/batches to be produced with the
producer.
+ // The producer should only call `on_next_task` at most the registered number
+ // of arrays before propagating backpressure.
+ //
+ // Any error encountered by calling request must be propagated by calling
the `on_error`
+ // callback of the ArrowAsyncDeviceStreamHandler.
+ //
+ // While not cancelled, any subsequent calls to `on_next_task`, `on_error` or
+ // `release` should be scheduled by the producer to be called later.
+ //
+ // It is invalid for a consumer to call this with a value of n <= 0,
producers should
+ // error if given such a value.
+ void (*request)(struct ArrowAsyncProducer* self, int64_t n);
+
+ // This cancel callback signals a producer that it must eventually stop
making calls
+ // to on_next_task. It must be idempotent and thread-safe. After calling
cancel once,
+ // subsequent calls must be NOPs. This must not call any consumer-side
handlers other
+ // than `on_error`.
+ //
+ // It is not required that calling cancel affect the producer immediately,
only that it
+ // must eventually stop calling on_next_task and subsequently call release
on the
+ // async handler. As such, a consumer must be prepared to receive one or
more calls to
+ // `on_next_task` even after calling cancel if there are still requested
arrays pending.
+ //
+ // Successful cancellation should *not* result in the producer calling
`on_error`, it
+ // should finish out any remaining tasks and eventually call `release`.
+ //
+ // Any error encountered during handling a call to cancel must be reported
via the
+ // on_error callback on the async stream handler.
+ void (*cancel)(struct ArrowAsyncProducer* self);
+
+ // Any additional metadata tied to a specific stream of data. This must
either be NULL
+ // or a valid pointer to metadata which is encoded in the same way schema
metadata
+ // would be. Non-null metadata must be valid for the lifetime of this
object. As an
+ // example a producer could use this to provide the total number of rows
and/or batches
+ // in the stream if known.
+ const char* additional_metadata;
+
+ // producer-specific opaque data.
+ void* private_data;
+};
+
+// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an
asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+ // Handler for receiving a schema. The passed in stream_schema must be
+ // released or moved by the handler (producer is giving ownership of the
schema to
+ // the handler, but not ownership of the top level object itself).
+ //
+ // With the exception of an error occurring (on_error), this must be the
first
+ // callback function which is called by a producer and must only be called
exactly
+ // once. As such, the producer should provide a valid ArrowAsyncProducer
instance
+ // so the consumer can control the flow. See the documentation on
ArrowAsyncProducer
+ // for how it works. The ArrowAsyncProducer is owned by the producer who
calls this
+ // function and thus the producer is responsible for cleaning it up when
calling
+ // the release callback of this handler.
+ //
+ // If there is any additional metadata tied to this stream, it will be
provided as
+ // a non-null value for the `additional_metadata` field of the
ArrowAsyncProducer
+ // which will be valid at least until the release callback is called.
+ //
+ // Return value: 0 if successful, `errno`-compatible error otherwise
+ //
+ // A producer that receives a non-zero return here should stop producing and
eventually
+ // call release instead.
+ int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+ struct ArrowSchema* stream_schema);
+
+ // Handler for receiving data. This is called when data is available
providing an
+ // ArrowAsyncTask struct to signify it. The producer indicates the end of
the stream
+ // by passing NULL as the value for the task rather than a valid pointer to
a task.
+ // The task object is only valid for the lifetime of this function call, if
a consumer
+ // wants to utilize it after this function returns, it must copy or move the
contents
+ // of it to a new ArrowAsyncTask object.
+ //
+ // The `request` callback of a provided ArrowAsyncProducer must be called in
order
+ // to start receiving calls to this handler.
+ //
+ // The metadata argument can be null or can be used by a producer
+ // to pass arbitrary extra information to the consumer (such as total number
+ // of rows, context info, or otherwise). The data should be passed using the
same
+ // encoding as the metadata within the ArrowSchema struct itself (defined in
+ // the spec at
+ //
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata)
+ //
+ // If metadata is non-null then it only needs to exist for the lifetime of
this call,
+ // a consumer who wants it to live after that must copy it to ensure
lifetime.
+ //
+ // A producer *must not* call this concurrently from multiple different
threads.
+ //
+ // A consumer must be prepared to receive one or more calls to this callback
even
+ // after calling cancel on the corresponding ArrowAsyncProducer, as cancel
does not
+ // guarantee it happens immediately.
+ //
+ // Return value: 0 if successful, `errno`-compatible error otherwise.
+ //
+ // If the consumer returns a non-zero return from this method, that
indicates to the
+ // producer that it should stop propagating data as an error occurred. After
receiving
+ // such a return, the only interaction with this object is for the producer
to call
+ // the `release` callback.
+ int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
+ struct ArrowAsyncTask* task, const char* metadata);
+
+ // Handler for encountering an error. The producer should call release after
+ // this returns to clean up any resources. The `code` passed in can be any
error
+ // code that a producer wants, but should be errno-compatible for
consistency.
+ //
+ // If the message or metadata are non-null, they will only last as long as
this
+ // function call. The consumer would need to perform a copy of the data if
it is
+ // necessary for them to live past the lifetime of this call.
+ //
+ // Error metadata should be encoded as with metadata in ArrowSchema, defined
in
+ // the spec at
+ //
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata
+ //
+ // It is valid for this to be called by a producer with or without a
preceding call
+ // to ArrowAsyncProducer.request.
+ //
+ // This callback must not call any methods of an ArrowAsyncProducer object.
+ void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code,
+ const char* message, const char* metadata);
+
+ // Release callback to release any resources for the handler. Should always
be
+ // called by a producer when it is done utilizing a handler. No callbacks
should
+ // be called after this is called.
+ //
+ // It is valid for the release callback to be called by a producer with or
without
+ // a preceding call to ArrowAsyncProducer.request.
+ //
+ // The release callback must not call any methods of an ArrowAsyncProducer
object.
+ void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
+
+ // MUST be populated by the producer BEFORE calling any callbacks other than
release.
+ // This provides the connection between a handler and its producer, and must
exist until
+ // the release callback is called.
+ struct ArrowAsyncProducer* producer;
+
+ // Opaque handler-specific data
+ void* private_data;
+};
+
+#endif // ARROW_C_ASYNC_STREAM_INTERFACE
+
#ifdef __cplusplus
}
#endif
diff --git a/docs/source/format/CDeviceDataInterface.rst
b/docs/source/format/CDeviceDataInterface.rst
index 59433bae47..fbb2012c30 100644
--- a/docs/source/format/CDeviceDataInterface.rst
+++ b/docs/source/format/CDeviceDataInterface.rst
@@ -506,6 +506,8 @@ could be used for any device:
arr->array.release(&arr->array);
}
+.. _c-device-stream-interface:
+
Device Stream Interface
=======================
@@ -650,6 +652,367 @@ The stream source is not assumed to be thread-safe.
Consumers wanting to
call ``get_next`` from several threads should ensure those calls are
serialized.
+Async Device Stream Interface
+=============================
+
+.. warning::
+
+ Experimental: The Async C Device Stream interface is experimental in its
current
+ form. Based on feedback and usage the protocol definition may change until
+ it is fully standardized.
+
+The :ref:`C stream interface <c-device-stream-interface>` provides a
synchronous
+API centered around the consumer calling the producer functions to retrieve
+the next record batch. For concurrent communication between producer and
consumer,
+the ``ArrowAsyncDeviceStreamHandler`` can be used. This interface is
non-opinionated
+and may fit into different concurrent communication models.
+
+Semantics
+---------
+
+Rather than the producer providing a structure of callbacks for a consumer to
+call and retrieve records, the Async interface is a structure allocated and
populated by the consumer.
+The consumer allocated struct provides handler callbacks for the producer to
call
+when the schema and chunks of data are available.
+
+In addition to the ``ArrowAsyncDeviceStreamHandler``, there are also two
additional
+structs used for the full data flow: ``ArrowAsyncTask`` and
``ArrowAsyncProducer``.
+
+Structure Definition
+--------------------
+
+The C device async stream interface consists of three ``struct`` definitions:
+
+.. code-block:: c
+
+ #ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+ #define ARROW_C_ASYNC_STREAM_INTERFACE
+
+ struct ArrowAsyncTask {
+ int (*extract_data)(struct ArrowArrayTask* self, struct
ArrowDeviceArray* out);
+
+ void* private_data;
+ };
+
+ struct ArrowAsyncProducer {
+ void (*request)(struct ArrowAsyncProducer* self, int64_t n);
+ void (*cancel)(struct ArrowAsyncProducer* self);
+
+ void (*release)(struct ArrowAsyncProducer* self);
+ const char* additional_metadata;
+ void* private_data;
+ };
+
+ struct ArrowAsyncDeviceStreamHandler {
+ // consumer-specific handlers
+ int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+ struct ArrowSchema* stream_schema);
+ int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
+ struct ArrowAsyncTask* task, const char* metadata);
+ void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self,
+ int code, const char* message, const char* metadata);
+
+ // release callback
+ void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
+
+ // must be populated before calling any callbacks
+ struct ArrowAsyncProducer* producer;
+
+ // opaque handler-specific data
+ void* private_data;
+ };
+
+ #endif // ARROW_C_ASYNC_STREAM_INTERFACE
+
+.. note::
+ The canonical guard ``ARROW_C_ASYNC_STREAM_INTERFACE`` is meant to avoid
+ duplicate definitions if two projects copy the C async stream interface
+ definitions into their own headers, and a third-party project includes
+ from these two projects. It is therefore important that this guard is kept
+ exactly as-is when these definitions are copied.
+
+The ArrowAsyncDeviceStreamHandler structure
+'''''''''''''''''''''''''''''''''''''''''''
+
+The structure has the following fields:
+
+.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct
ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*)
+
+ *Mandatory.* Handler for receiving the schema of the stream. All incoming
records should
+ match the provided schema. If successful, the function should return 0,
otherwise
+ it should return an ``errno``-compatible error code.
+
+ If there is any extra contextual information that the producer wants to
provide, it can set
+ :c:member:`ArrowAsyncProducer.additional_metadata` to a non-NULL value.
This is encoded in the
+ same format as :c:member:`ArrowSchema.metadata`. The lifetime of this
metadata, if not ``NULL``,
+ should be tied to the lifetime of the ``ArrowAsyncProducer`` object.
+
+ Unless the ``on_error`` handler is called, this will always get called
exactly once and will be
+ the first method called on this object. As such the producer *MUST*
populate the ``ArrowAsyncProducer``
+ member before calling this function to allow the consumer to apply
back-pressure and control the flow of data.
+ The producer maintains ownership of the ``ArrowAsyncProducer`` and must
clean it up *after*
+ calling the release callback on the ``ArrowAsyncDeviceStreamHandler``.
+
+ A producer that receives a non-zero result here must not subsequently call
anything other than
+ the release callback on this object.
+
+.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_next_task)(struct
ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncTask*, const char*)
+
+ *Mandatory.* Handler to be called when a new record is available for
processing. The
+ schema for each record should be the same as the schema that ``on_schema``
was called with.
+ If successfully handled, the function should return 0, otherwise it should
return an
+ ``errno``-compatible error code.
+
+ Rather than passing the record itself it receives an ``ArrowAsyncTask``
instead to facilitate
+ better consumer-focused thread control as far as receiving the data. A
call to this function
+ simply indicates that data is available via the provided task.
+
+ The producer signals the end of the stream by passing ``NULL`` for the
``ArrowAsyncTask``
+ pointer instead of a valid address. This task object is only valid during
the lifetime of
+ this function call. If the consumer wants to use the task beyond the scope
of this method, it
+ must copy or move its contents to a new ArrowAsyncTask object.
+
+ The ``const char*`` parameter exists for producers to provide any extra
contextual information
+ they want. This is encoded in the same format as
:c:member:`ArrowSchema.metadata`. If not ``NULL``,
+ the lifetime is only the scope of the call to this function. A consumer
who wants to maintain
+ the additional metadata beyond the lifetime of this call *MUST* copy the
value themselves.
+
+ A producer *MUST NOT* call this concurrently from multiple threads.
+
+ The :c:member:`ArrowAsyncProducer.request` callback must be called to
start receiving calls to this
+ handler.
+
+.. c:member:: void (*ArrowAsyncDeviceStreamHandler.on_error)(struct
ArrowAsyncDeviceStreamHandler, int, const char*, const char*)
+
+ *Mandatory.* Handler to be called when an error is encountered by the
producer. After calling
+ this, the ``release`` callback will be called as the last call on this
struct. The parameters
+ are an ``errno``-compatible error code and an optional error message and
metadata.
+
+ If the message and metadata are not ``NULL``, their lifetime is only valid
during the scope
+ of this call. A consumer who wants to maintain these values past the
return of this function
+ *MUST* copy the values themselves.
+
+ If the metadata parameter is not ``NULL``, to provide key-value error
metadata, then it should
+ be encoded identically to the way that metadata is encoded in
:c:member:`ArrowSchema.metadata`.
+
+ It is valid for this to be called by a producer with or without a
preceding call to
+ :c:member:`ArrowAsyncProducer.request`. This callback *MUST NOT* call any
methods of an
+ ``ArrowAsyncProducer`` object.
+
+.. c:member:: void (*ArrowAsyncDeviceStreamHandler.release)(struct
ArrowAsyncDeviceStreamHandler*)
+
+ *Mandatory.* A pointer to a consumer-provided release callback for the
handler.
+
+ It is valid for this to be called by a producer with or without a
preceding call to
+ :c:member:`ArrowAsyncProducer.request`. This must not call any methods of
an ``ArrowAsyncProducer``
+ object.
+
+.. c:member:: struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer
+
+ *Mandatory.* The producer object that the consumer will use to request
additional data or cancel.
+
+ This object *MUST* be populated by the producer before calling the
:c:member:`ArrowAsyncDeviceStreamHandler.on_schema`
+ callback. The producer maintains ownership of this object and must clean
it up *after* calling
+ the release callback on the ``ArrowAsyncDeviceStreamHandler``.
+
+ The consumer *CANNOT* assume that this is valid until the ``on_schema``
callback is called.
+
+.. c:member:: void* ArrowAsyncDeviceStreamHandler.private_data
+
+ *Optional.* An opaque pointer to consumer-provided private data.
+
+ Producers *MUST NOT* process this member. Lifetime of this member is
handled by
+ the consumer, and especially by the release callback.
+
+The ArrowAsyncTask structure
+''''''''''''''''''''''''''''
+
+The purpose of using a Task object rather than passing the array directly to
the ``on_next``
+callback is to allow for more complex and efficient thread handling. Utilizing
a Task
+object allows for a producer to separate the "decoding" logic from the I/O,
enabling a
+consumer to avoid transferring data between CPU cores (e.g. from one L1/L2
cache to another).
+
+This producer-provided structure has the following fields:
+
+.. c:member:: int (*ArrowArrayTask.extract_data)(struct ArrowArrayTask*,
struct ArrowDeviceArray*)
+
+ *Mandatory.* A callback to populate the provided ``ArrowDeviceArray`` with
the available data.
+ The order of ``ArrowAsyncTasks`` provided by the producer enables a consumer
to know the order of
+ the data to process. If the consumer does not care about the data that is
owned by this task,
+ it must still call ``extract_data`` so that the producer can perform any
required cleanup. ``NULL``
+ should be passed as the device array pointer to indicate that the consumer
doesn't want the
+ actual data, letting the task perform necessary cleanup.
+
+ If a non-zero value is returned from this, it should be followed only by the
producer calling
+ the ``on_error`` callback of the ``ArrowAsyncDeviceStreamHandler``. Because
calling this method
+ is likely to be separate from the current control flow, returning a non-zero
value to signal
+ an error occuring allows the current thread to decide handle the case
accordingly, while still
+ allowing all error logging and handling to be centralized in the
+ :c:member:`ArrowAsyncDeviceStreamHandler.on_error` callback.
+
+ Rather than having a separate release callback, any required cleanup should
be performed as part
+ of the invocation of this callback. Ownership of the Array is given to the
pointer passed in as
+ a parameter, and this array must be released separately.
+
+ It is only valid to call this method exactly once.
+
+.. c:member:: void* ArrowArrayTask.private_data
+
+ *Optional.* An opaque pointer to producer-provided private data.
+
+ Consumers *MUST NOT* process this member. Lifetime of this member is handled
by
+ the producer who created this object, and should be cleaned up if necessary
during
+ the call to :c:member:`ArrowArrayTask.extract_data`.
+
+The ArrowAsyncProducer structure
+''''''''''''''''''''''''''''''''
+
+This producer-provided and managed object has the following fields:
+
+.. c:member:: void (*ArrowAsyncProducer.request)(struct ArrowAsyncProducer*,
uint64_t)
+
+ *Mandatory.* This function must be called by a consumer to start receiving
calls to
+ :c:member:`ArrowAsyncDeviceStreamHandler.on_next_task`. It *MUST* be valid
to call
+ this synchronously from within
:c:member:`ArrowAsyncDeviceStreamHandler.on_next_task`
+ or :c:member:`ArrowAsyncDeviceStreamHandler.on_schema`. As a result, this
function
+ *MUST NOT* synchronously call ``on_next_task`` or ``on_error`` to avoid
recursive
+ and reentrant callbacks.
+
+ After ``cancel`` is called, additional calls to this function must be a NOP,
but allowed.
+
+ While not cancelled, calling this function registers the given number of
additional
+ arrays/batches to be produced by the producer. A producer should only call
+ the appropriate ``on_next_task`` callback up to a maximum of the total sum
of calls to
+ this method before propagating back-pressure / waiting.
+
+ Any error encountered by calling request must be propagated by calling the
``on_error``
+ callback of the ``ArrowAsyncDeviceStreamHandler``.
+
+ It is invalid to call this function with a value of ``n`` that is ``<= 0``.
Producers should
+ error (e.g. call ``on_error``) if receiving such a value for ``n``.
+
+.. c:member:: void (*ArrowAsyncProducer.cancel)(struct ArrowAsyncProducer*)
+
+ *Mandatory.* This function signals to the producer that it must *eventually*
stop calling
+ ``on_next_task``. Calls to ``cancel`` must be idempotent and thread-safe.
After calling
+ it once, subsequent calls *MUST* be a NOP. This *MUST NOT* call any
consumer-side handlers
+ other than ``on_error``.
+
+ It is not required that calling ``cancel`` affect the producer
*immediately*, only that it
+ must eventually stop calling ``on_next_task`` and then subsequently call
``release``
+ on the async handler object. As such, a consumer *MUST* be prepared to
receive one or more
+ calls to ``on_next_task`` or ``on_error`` even after calling ``cancel`` if
there are still
+ requested arrays pending.
+
+ Successful cancelling *MUST NOT* result in a producer calling
+ :c:member:`ArrowAsyncDeviceStreamHandler.on_error`, instead it should finish
out any remaining
+ tasks (calling ``on_next_task`` accordingly) and eventually just call
``release``.
+
+ Any error encountered during handling a call to cancel must be reported via
the ``on_error``
+ callback on the async stream handler.
+
+.. c:member:: const char* ArrowAsyncProducer.additional_metadata
+
+ *Optional.* An additional metadata string to provide any extra context to
the consumer. This *MUST*
+ either be ``NULL`` or a valid string that is encoded in the same way as
:c:member:`ArrowSchema.metadata`.
+ As an example, a producer could utilize this metadata to provide the total
number of rows and/or batches
+ in the stream if known.
+
+ If not ``NULL`` it *MUST* be valid for at least the lifetime of this
object.
+
+.. c:member:: void* ArrowAsyncProducer.private_data
+
+ *Optional.* An opaque pointer to producer-provided specific data.
+
+ Consumers *MUST NOT* process this member, the lifetime is owned by the
producer
+ that constructed this object.
+
+Error Handling
+''''''''''''''
+
+Unlike the regular C Stream interface, the Async interface allows for errors
to flow in
+both directions. As a result, error handling can be slightly more complex.
Thus this spec
+designates the following rules:
+
+* If the producer encounters an error during processing, it should call the
``on_error``
+ callback, and then call ``release`` after it returns.
+
+* If ``on_schema`` or ``on_next_task`` returns a non-zero integer value, the
producer *should not*
+ call the ``on_error`` callback, but instead should eventually call
``release`` at some point
+ before or after any logging or processing of the error code.
+
+Result lifetimes
+''''''''''''''''
+
+The ``ArrowSchema`` passed to the ``on_schema`` callback must be released
independently,
+with the object itself needing to be moved to a consumer owned ``ArrowSchema``
object. The
+``ArrowSchema*`` passed as a parameter to the callback *MUST NOT* be stored
and kept.
+
+The ``ArrowAsyncTask`` object provided to ``on_next_task`` is owned by the
producer and
+will be cleaned up during the invocation of calling ``extract_data`` on it. If
the consumer
+doesn't care about the data, it should pass ``NULL`` instead of a valid
``ArrowDeviceArray*``.
+
+The ``const char*`` error ``message`` and ``metadata`` which are passed to
``on_error``
+are only valid within the scope of the ``on_error`` function itself. They must
be copied
+if it is necessary for them to exist after it returns.
+
+Stream Handler Lifetime
+'''''''''''''''''''''''
+
+Lifetime of the async stream handler is managed using a release callback with
similar
+usage as in :ref:`C data interface <c-data-interface-released>`.
+
+ArrowAsyncProducer Lifetime
+'''''''''''''''''''''''''''
+
+The lifetime of the ``ArrowAsyncProducer`` is owned by the producer itself and
should
+be managed by it. It *MUST* be populated before calling any methods other than
``release``
+and *MUST* remain valid at least until just before calling ``release`` on the
stream handler object.
+
+Thread safety
+'''''''''''''
+
+All handler functions on the ``ArrowAsyncDeviceStreamHandler`` should only be
called in a
+serialized manner, but are not guaranteed to be called from the same thread
every time. A
+producer should wait for handler callbacks to return before calling the next
handler callback,
+and before calling the ``release`` callback.
+
+Back-pressure is managed by the consumer making calls to
:c:member:`ArrowAsyncProducer.request`
+to indicate how many arrays it is ready to receive.
+
+The ``ArrowAsyncDeviceStreamHandler`` object should be able to handle
callbacks as soon as
+it is passed to the producer, any initialization should be performed before it
is provided.
+
+Possible Sequence Diagram
+-------------------------
+
+.. mermaid::
+
+ sequenceDiagram
+ Consumer->>+Producer: ArrowAsyncDeviceStreamHandler*
+ Producer-->>+Consumer: on_schema(ArrowAsyncProducer*, ArrowSchema*)
+ Consumer->>Producer: ArrowAsyncProducer->request(n)
+
+ par
+ loop up to n times
+ Producer-->>Consumer: on_next_task(ArrowAsyncTask*)
+ end
+ and for each task
+ Consumer-->>Producer: ArrowAsyncTask.extract_data(...)
+ Consumer-->>Producer: ArrowAsyncProducer->request(1)
+ end
+
+ break Optionally
+ Consumer->>-Producer: ArrowAsyncProducer->cancel()
+ end
+
+ loop possible remaining
+ Producer-->>Consumer: on_next_task(ArrowAsyncTask*)
+ end
+
+ Producer->>-Consumer: ArrowAsyncDeviceStreamHandler->release()
+
+
Interoperability with other interchange formats
===============================================