paleolimbot commented on code in PR #44495:
URL: https://github.com/apache/arrow/pull/44495#discussion_r1811426098
##########
cpp/src/arrow/c/abi.h:
##########
@@ -228,6 +228,214 @@ struct ArrowDeviceArrayStream {
#endif // ARROW_C_DEVICE_STREAM_INTERFACE
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+# define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// ArrowAsyncTask represents available data from a producer that was passed to
+// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+//
+// The reason for this Task approach instead of the Async interface returning
+// the Array directly is to allow for more complex thread handling and reducing
+// context switching and data transfers between CPU cores (e.g. from one L1/L2
+// cache to another) if desired.
+//
+// For example, the `on_next_task` callback can be called when data is ready,
while
+// the producer puts potential "decoding" logic in the `ArrowAsyncTask`
object. This
+// allows for the producer to manage the I/O on one thread which calls
`on_next_task`
+// and the consumer can determine when the decoding (producer logic in the
`extract_data`
+// callback of the task) occurs and on which thread, to avoid a CPU core
transfer
+// (data staying in the L2 cache).
+struct ArrowAsyncTask {
+ // This callback should populate the ArrowDeviceArray associated with this
task.
+ // The order of ArrowAsyncTasks provided by the producer enables a consumer
to
+ // ensure the order of data to process.
+ //
+ // This function is expected to be synchronous, but should not perform any
blocking
+ // I/O. Ideally it should be as cheap as possible so as to not tie up the
consumer
+ // thread unnecessarily.
+ //
+ // Returns: 0 if successful, errno-compatible error otherwise.
+ //
+ // If a non-0 value is returned then it should be followed by a call to
`on_error`
+ // on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's
highly
+ // likely that whatever is calling this function may be entirely
disconnected from
+ // the current control flow. Indicating an error here with a non-zero return
allows
+ // the current flow to be aware of the error occurring, while still allowing
any
+ // logging or error handling to still be centralized in the `on_error`
callback of
+ // the original Async handler.
+ //
+ // Rather than a release callback, any required cleanup should be performed
as part
+ // of the invocation of `extract_data`. Ownership of the Array is passed to
the consumer
+ // calling this, and so it must be released separately.
+ //
+ // It is only valid to call this method exactly once.
+ int (*extract_data)(struct ArrowAsyncTask* self, struct ArrowDeviceArray*
out);
Review Comment:
An issue for the format PR, but was it discussed to allow `const char*
metadata` here, too for batch-specific metadata? Specifically, an IPC scanner
could propagate
https://github.com/apache/arrow/blob/bcb4653c6387a2b22df52a3bbc91317607abdccc/format/Message.fbs#L154
##########
cpp/src/arrow/c/bridge.h:
##########
@@ -406,4 +407,81 @@ Result<std::shared_ptr<ChunkedArray>>
ImportDeviceChunkedArray(
/// @}
+/// \defgroup c-async-stream-interface Functions for working with the async C
data
+/// interface.
+///
+/// @{
+
+/// \brief AsyncErrorDetail is a StatusDetail that contains an error code and
message
+/// from an asynchronous operation.
+class AsyncErrorDetail : public StatusDetail {
+ public:
+ AsyncErrorDetail(int code, std::string message, std::string metadata)
+ : code_(code), message_(std::move(message)),
metadata_(std::move(metadata)) {}
+ const char* type_id() const override { return "AsyncErrorDetail"; }
+ // ToString just returns the error message that was returned with the error
+ std::string ToString() const override { return message_; }
+ // code is an errno-compatible error code
+ int code() const { return code_; }
+ // returns any metadata that was returned with the error, likely in a
+ // key-value format similar to ArrowSchema metadata
+ const std::string& ErrorMetadata() const { return metadata_; }
+
+ private:
+ int code_{0};
+ std::string message_;
+ std::string metadata_;
+};
+
+struct AsyncRecordBatchGenerator {
+ std::shared_ptr<Schema> schema;
+ DeviceAllocationType device_type;
+ AsyncGenerator<RecordBatchWithMetadata> generator;
+};
+
+namespace internal {
+class Executor;
+}
+
+/// \brief Create an AsyncRecordBatchReader and populate a corresponding
handler to pass
+/// to a producer
+///
+/// The ArrowAsyncDeviceStreamHandler struct is intended to have its callbacks
populated
+/// and then be passed to a producer to call the appropriate callbacks when
data is ready.
+/// This inverts the traditional flow of control, and so we construct a
corresponding
+/// AsyncRecordBatchGenerator to provide an interface for the consumer to
retrieve data as
+/// it is pushed to the handler.
+///
+/// \param[in,out] handler C struct to be populated
+/// \param[in] executor the executor to use for waiting and populating record
batches
+/// \param[in] queue_size initial number of record batches to request for
queueing
Review Comment:
Not important for proof-of-concept, but it might be a more meaningful
parameter as a number of bytes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]