zeroshade commented on code in PR #43632:
URL: https://github.com/apache/arrow/pull/43632#discussion_r1759129213


##########
docs/source/format/CDeviceDataInterface.rst:
##########
@@ -650,6 +652,331 @@ The stream source is not assumed to be thread-safe. 
Consumers wanting to
 call ``get_next`` from several threads should ensure those calls are
 serialized.
 
+Async Device Stream Interface
+=============================
+
+The :ref:`C stream interface <_c-device-stream-interface>` provides a 
synchronous
+API centered around the consumer calling the callback functions to retrieve
+the next record batch. For some bindings, use cases, and interoperability, a 
more
+asynchronous, producer-focused interface may be required. These scenarios can 
utilize
+the ``ArrowAsyncDeviceStreamHandler``.
+
+Semantics
+---------
+
+Rather than the producer providing a structure of callbacks for a consumer to
+call and retrieve records, the Async interface is a consumer allocated 
structure.
+The consumer allocated struct provides handler callbacks for the producer to 
call
+when the schema and chunks of data are available, rather than the consumer 
using
+a blocking pull-style iteration.
+
+In addition to the ``ArrowAsyncDeviceStreamHandler``, there are also two 
additional
+structs used for the full data flow: ``ArrowAsyncTask`` and 
``ArrowAsyncProducer``.
+
+Structure Definition
+--------------------
+
+The C device async stream interface is defined with a single ``struct`` 
definition:
+
+.. code-block:: c
+
+    #ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+    #define ARROW_C_ASYNC_STREAM_INTERFACE
+
+    struct ArrowAsyncTask {
+      int (*get_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* 
out);
+
+      void* private_data;
+    };
+
+    struct ArrowAsyncProducer {
+      void (*request)(struct ArrowAsyncProducer* self, uint64_t n);
+      void (*cancel)(struct ArrowAsyncProducer* self);
+
+      void (*release)(struct ArrowAsyncProducer* self);
+      void* private_data;
+    };
+
+    struct ArrowAsyncDeviceStreamHandler {
+      // handlers
+      int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+                       struct ArrowAsyncProducer* producer,
+                       struct ArrowSchema* stream_schema, const char* 
addl_metadata);
+      int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
+                          struct ArrowAsyncTask* task, const char* metadata);
+      void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self,
+                       int code, const char* message, const char* metadata);
+
+      // release callback
+      void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
+
+      // opaque handler-specific data
+      void* private_data;
+    };
+
+    #endif  // ARROW_C_ASYNC_STREAM_INTERFACE
+
+.. note::
+    The canonical guard ``ARROW_C_ASYNC_STREAM_INTERFACE`` is meant to avoid
+    duplicate definitions if two projects copy the C async stream interface
+    definitions into their own headers, and a third-party project includes
+    from these two projects. It is therefore important that this guard is kept
+    exactly as-is when these definitions are copied.
+
+The ArrowAsyncDeviceStreamHandler structure
+'''''''''''''''''''''''''''''''''''''''''''
+
+The structure has the following fields:
+
+.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct 
ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncProducer*, struct 
ArrowSchema*, const char*)
+
+    *Mandatory.* Handler for receiving the schema of the stream. All records 
should
+    match the provided schema. If successful, the function should return 0, 
otherwise
+    it should return an ``errno``-compatible error code.
+
+    The ``const char*`` parameter exists for producers to provide any extra 
contextual information
+    they want, such as the total number of rows in the stream, statistics, or 
otherwise. This is
+    encoded in the same format as :c:member:`ArrowSchema.metadata`. If not 
``NULL``,
+    the lifetime is only the scope of the call to this function. A consumer 
who wants to maintain
+    the additional metadata beyond the lifetime of this call *MUST* copy the 
value themselves.
+
+    Unless the ``on_error`` handler is called, this will always get called 
exactly once and will be
+    the first method called on this object. As such the producer *MUST* 
provide an ``ArrowAsyncProducer``
+    object when calling this function to allow the consumer to manage 
back-pressure and flow control.
+    The producer maintains ownership of the ``ArrowAsyncProducer`` and must 
clean it up before or after
+    calling the release callback on this object.
+
+    A producer that receives a non-zero result here must not subsequently call 
anything other than
+    the release callback on this object.
+
+.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_next_task)(struct 
ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncTask*, const char*)
+
+    *Mandatory.* Handler to be called when a new record is available for 
processing. The
+    schema for each record should be the same as the schema that ``on_schema`` 
was called with.
+    If successfully handled, the function should return 0, otherwise it should 
return an
+    ``errno``-compatible error code.
+
+    Rather than passing the record itself it receives an ``ArrowAsyncTask`` 
instead to facilitate
+    better consumer-focused thread control as far as receiving the data. A 
call to this function
+    simply indicates that data is available via the provided task.
+
+    The ``const char*`` parameter exists for producers to provide any extra 
contextual information
+    they want. This is encoded in the same format as 
:c:member:`ArrowSchema.metadata`. If not ``NULL``,
+    the lifetime is only the scope of the call to this function. A consumer 
who wants to maintain
+    the additional metadata beyond the lifetime of this call *MUST* copy the 
value themselves.
+
+    A producer *MUST NOT* call this concurrently from multiple different 
threads.
+
+    The :c:member:`ArrowAsyncProducer.request` callback must be called to 
start receiving calls to this
+    handler.
+
+    This function *MUST* be able to be called re-entrantly on the same thread 
to allow for the
+    :c:member:`ArrowAsyncProducer.request` callback to potentially call this 
method recursively.
+
+.. c:member:: void (*ArrowAsyncDeviceStreamHandler.on_error)(struct 
ArrowAsyncDeviceStreamHandler, int, const char*, const char*)
+
+    *Mandatory.* Handler to be called when an error is encountered by the 
producer. After calling
+    this, the ``release`` callback will be called as the last call on this 
struct. The parameters
+    are an ``errno``-compatible error code and an optional error message and 
metadata.
+
+    If the message and metadata are not ``NULL``, their lifetime is only valid 
during the scope
+    of this call. A consumer who wants to maintain these values past the 
return of this function
+    *MUST* copy the values themselves.
+
+    If the metadata parameter is not ``NULL``, to provide key-value error 
metadata, then it should
+    be encoded identically to the way that metadata is encoded in 
:c:member:`ArrowSchema.metadata`.
+
+    It is valid for this to be called by a producer with or without a 
preceding call to
+    :c:member:`ArrowAsyncProducer.request`. This callback *MUST NOT* call any 
methods of an
+    ``ArrowAsyncProducer`` object.
+
+.. c:member:: void (*ArrowAsyncDeviceStreamHandler.release)(struct 
ArrowAsyncDeviceStreamHandler*)
+
+    *Mandatory.* A pointer to a consumer-provided release callback for the 
handler.
+
+    It is valid for this to be called by a producer with or without a 
preceding call to
+    :c:member:`ArrowAsyncProducer.request`. This must not call any methods of 
an ``ArrowAsyncProducer``
+    object.
+
+.. c:member:: void* ArrowAsyncDeviceStreamHandler.private_data
+
+    *Optional.* An opaque pointer to consumer-provided private data.
+
+    Producers *MUST NOT* process this member. Lifetime of this member is 
handled by
+    the consumer, and especially by the release callback.
+
+The ArrowAsyncTask structure
+''''''''''''''''''''''''''''
+
+The purpose of using a Task object rather than passing the array directly to 
the ``on_next``
+callback is to allow for more complex and efficient thread handling. Utilizing 
a Task
+object allows for a producer to separate the "decoding" logic from the I/O, 
enabling a
+consumer to avoid transferring data between CPU cores (e.g. from one L1/L2 
cache to another).
+
+This producer-provided structure has the following fields:
+
+.. c:member:: int (*ArrowArrayTask.get_data)(struct ArrowArrayTask*, struct 
ArrowDeviceArray*)
+
+  *Mandatory.* A callback to populate the provided ``ArrowDeviceArray`` with 
the available data.
+  The order of ArrowAsyncTasks provided by the producer enables a consumer to 
know the order of
+  the data to process.
+
+  If a non-zero value is returned from this, it should be followed only by the 
producer calling
+  the ``on_error`` callback of the ``ArrowAsyncDeviceStreamHandler``. Because 
calling this method
+  is likely to be separate from the current control flow, returning a non-zero 
value to signal
+  an error occuring allows the current thread to decide handle the case 
accordingly, while still
+  allowing all error logging and handling to be centralized in the
+  :c:member:`ArrowAsyncDeviceStreamHandler.on_error` callback.
+
+  Rather than having a separate release callback, any required cleanup should 
be performed as part
+  of the invocation of this callback, with ownership of the Array being passed 
to the consumer-provided
+  parameter calling this, and must be released separately.
+
+  It is only valid to call this method exactly once.
+
+.. c:member:: void* ArrowArrayTask.private_data
+
+  *Optional.* An opaque pointer to consumer-provided private data.
+
+  Consumers *MUST NOT* process this member. Lifetime of this member is handled 
by
+  the producer who created this object, and should be cleaned up if necessary 
during
+  the call to :c:member:`ArrowArrayTask.get_data`.
+
+The ArrowAsyncProducer structure
+''''''''''''''''''''''''''''''''
+
+This producer-provided and managed object has the following fields:
+
+.. c:member:: void (*ArrowAsyncProducer.request)(struct ArrowAsyncProducer*, 
uint64_t)
+
+  *Mandatory.* This function must be called by a consumer to start receiving 
calls to
+  :c:member:`ArrowAsyncDeviceStreamHandler.on_next_task`. It *MUST* be valid 
to call
+  this synchronously from within 
:c:member:`ArrowAsyncDeviceStreamHandler.on_next_task`
+  or :c:member:`ArrowAsyncDeviceStreamHandler.on_schema`. As a result, this 
function
+  *MUST NOT* synchronously call ``on_next_task`` or ``on_error`` to avoid 
recursive
+  and reentrant callbacks.
+
+  After ``cancel`` is called, additional calls to this function must be a NOP, 
but allowed.
+
+  While not cancelled, calling this function registers the given number of 
additional
+  arrays/batches to be produced by the producer. A producer should only call
+  the appropriate ``on_next_task`` callback up to a maximum of the total sum 
of calls to
+  this method before propagating back-pressure / waiting.
+
+  Any error encountered by calling request must be propagated by calling the 
``on_error``
+  callback of the ``ArrowAsyncDeviceStreamHandler``.
+
+  A producer *MUST* support an unbounded number of calls to this callback and 
*MUST* support
+  a total registered demand (sum requested - sum delivered) of up to 
``UINT64_MAX``.
+
+.. c:member:: void (*ArrowAsyncProducer.cancel)(struct ArrowAsyncProducer*)
+
+  *Mandatory.* This function signals to the producer that it must *eventually* 
stop calling
+  ``on_next_task``. Calls to ``cancel`` must be idempotent and thread-safe. 
After calling
+  it once, subsequent calls *MUST* be a NOP.
+
+  It is not required that calling ``cancel`` affect the producer 
*immediately*, only that it
+  must eventually stop calling ``on_next_task`` and then subsequently call 
``release``
+  on the async handler object. As such, a consumer *MUST* be prepared to 
receive one or more
+  calls to ``on_next_task`` or ``on_error`` even after calling ``cancel`` if 
there are still
+  requested arrays pending.
+
+  Successful cancelling *MUST NOT* result in a producer calling
+  :c:member:`ArrowAsyncDeviceStreamHandler.on_error`, instead it should finish 
out any remaining
+  tasks (calling ``on_next_task`` accordingly) and eventually just call 
``release``.
+
+  Any error encountered during handling a call to cancel must be reported via 
the ``on_error``
+  callback on the async stream handler.
+
+.. c:member:: void* ArrowAsyncProducer.private_data
+
+  *Optional.* An opaque pointer to producer-provided specific data.
+
+  Consumers *MUST NOT* process this member, the lifetime is owned by the 
producer
+  that constructed this object.
+
+Error Handling
+''''''''''''''
+
+Unlike the regular C Stream interface, the Async interface allows for errors 
to flow in
+both directions. As a result, error handling can be slightly more complex. 
Thus this spec
+designates the following rules:
+
+* If the producer encounters an error during processing, it should call the 
``on_error``
+callback, and then call ``release`` after it returns.
+
+* If ``on_schema`` or ``on_next_task`` returns a non-zero integer value, the 
producer *should not*
+call the ``on_error`` callback, but instead should eventually call ``release`` 
at some point
+before or after any logging or processing of the error code.
+
+Result lifetimes
+''''''''''''''''
+
+The ``ArrowSchema`` passed to the ``on_schema`` callback must be released 
independantly,
+with the object itself needing to be moved to a consumer owned ``ArrowSchema`` 
object. The
+``ArrowSchema*`` passed as a parameter to the callback *MUST NOT* be stored 
and kept.
+
+The ``ArrowAsyncTask`` object provided to ``on_next_task`` is owned by the 
producer and
+will be cleaned up during the invocation of calling ``get_data`` on it. If the 
consumer
+doesn't care about the data, it should pass ``NULL`` instead of a valid 
``ArrowDeviceArray*``.
+
+The ``const char*`` error ``message`` and ``metadata`` which are passed to 
``on_error``
+are only valid within the scope of the ``on_error`` function itself. They must 
be copied
+if it is necessary for them to exist after it returns.
+
+Stream Handler Lifetime
+'''''''''''''''''''''''
+
+Lifetime of the async stream handler is managed using a release callback with 
similar
+usage as in :ref:`C data interface <c-data-interface-released>`.
+
+ArrowAsyncProducer Lifetime
+'''''''''''''''''''''''''''
+
+The lifetime of the ``ArrowAsyncProducer`` passed to ``on_schema`` is owned by 
the producer
+itself and should be managed by it. It *MUST* remain valid at least until just 
before
+calling ``release`` on the stream handler object.
+
+Thread safety
+'''''''''''''
+
+All handler functions should only be called in a serialized manner, but are 
not guaranteed
+to be called from the same thread every time. A producer should wait for 
handler callbacks to
+return before calling the next handler callback, and before calling the 
``release`` callback.
+As a result, back-pressure is managed by how long it takes for the ``on_next`` 
handler to return.

Review Comment:
   Removed this sentence, added a sentence that backpressure is managed by 
calls to `request`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to