westonpace commented on code in PR #43632: URL: https://github.com/apache/arrow/pull/43632#discussion_r1712642429
########## cpp/src/arrow/c/abi.h: ########## @@ -228,6 +228,65 @@ struct ArrowDeviceArrayStream { #endif // ARROW_C_DEVICE_STREAM_INTERFACE +#ifndef ARROW_C_ASYNC_STREAM_INTERFACE +#define ARROW_C_ASYNC_STREAM_INTERFACE + +// Similar to ArrowDeviceArrayStream, except designed for an asynchronous +// style of interaction. While ArrowDeviceArrayStream provides producer +// defined callbacks, this is intended to be created by the consumer instead. +// The consumer passes this handler to the producer, which in turn uses the +// callbacks to inform the consumer of events in the stream. +struct ArrowAsyncDeviceStreamHandler { + // Handler for receiving a schema. The passed in stream_schema should be + // released or moved by the handler (producer is giving ownership of it to + // the handler). + // + // The `extension_param` argument can be null or can be used by a producer + // to pass arbitrary extra information to the consumer (such as total number + // of rows, context info, or otherwise). + // + // Return value: 0 if successful, `errno`-compatible error otherwise + int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowSchema* stream_schema, void* extension_param); + + // Handler for receiving an array/record batch. Always called at least once + // unless an error is encountered (which would result in calling on_error). + // An empty/released array is passed to indicate the end of the stream if no + // errors have been encountered. + // + // The `extension_param` argument can be null or can be used by a producer + // to pass arbitrary extra information to the consumer. + // + // Return value: 0 if successful, `errno`-compatible error otherwise. + int (*on_next)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowDeviceArray* next, void* extension_param); + + // Handler for encountering an error. The producer should call release after + // this returns to clean up any resources. + // + // If the message or metadata are non-null, they will only last as long as this + // function call. The consumer would need to perform a copy of the data if it is + // it is necessary for them live past the lifetime of this call. + // + // Error metadata should be encoded as with metadata in ArrowSchema, defined in + // the spec at + // https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata + // + // After this call, producers should follow-up by calling the release callback. + void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code, + const char* message, const char* metadata); + + // Release callback to release any resources for the handler. Should always be + // called by a producer when it is done utilizing a handler. No callbacks should + // be called after this is called. + void (*release)(struct ArrowAsyncDeviceStreamHandler* self); Review Comment: What should the consumer do if there are calls to `on_error` or `on_next` that are still in progress when this is called? ########## cpp/src/arrow/c/abi.h: ########## @@ -228,6 +228,65 @@ struct ArrowDeviceArrayStream { #endif // ARROW_C_DEVICE_STREAM_INTERFACE +#ifndef ARROW_C_ASYNC_STREAM_INTERFACE +#define ARROW_C_ASYNC_STREAM_INTERFACE + +// Similar to ArrowDeviceArrayStream, except designed for an asynchronous +// style of interaction. While ArrowDeviceArrayStream provides producer +// defined callbacks, this is intended to be created by the consumer instead. +// The consumer passes this handler to the producer, which in turn uses the +// callbacks to inform the consumer of events in the stream. +struct ArrowAsyncDeviceStreamHandler { + // Handler for receiving a schema. The passed in stream_schema should be + // released or moved by the handler (producer is giving ownership of it to + // the handler). + // + // The `extension_param` argument can be null or can be used by a producer + // to pass arbitrary extra information to the consumer (such as total number + // of rows, context info, or otherwise). + // + // Return value: 0 if successful, `errno`-compatible error otherwise + int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowSchema* stream_schema, void* extension_param); + + // Handler for receiving an array/record batch. Always called at least once + // unless an error is encountered (which would result in calling on_error). + // An empty/released array is passed to indicate the end of the stream if no + // errors have been encountered. + // + // The `extension_param` argument can be null or can be used by a producer + // to pass arbitrary extra information to the consumer. + // + // Return value: 0 if successful, `errno`-compatible error otherwise. + int (*on_next)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowDeviceArray* next, void* extension_param); Review Comment: You're explicit about ownership elsewhere so it might be nice to just add a statement "after this call the consumer is responsible for releasing the array provided by `next`" ########## cpp/src/arrow/c/abi.h: ########## @@ -228,6 +228,65 @@ struct ArrowDeviceArrayStream { #endif // ARROW_C_DEVICE_STREAM_INTERFACE +#ifndef ARROW_C_ASYNC_STREAM_INTERFACE +#define ARROW_C_ASYNC_STREAM_INTERFACE + +// Similar to ArrowDeviceArrayStream, except designed for an asynchronous +// style of interaction. While ArrowDeviceArrayStream provides producer +// defined callbacks, this is intended to be created by the consumer instead. +// The consumer passes this handler to the producer, which in turn uses the +// callbacks to inform the consumer of events in the stream. +struct ArrowAsyncDeviceStreamHandler { + // Handler for receiving a schema. The passed in stream_schema should be + // released or moved by the handler (producer is giving ownership of it to + // the handler). + // + // The `extension_param` argument can be null or can be used by a producer + // to pass arbitrary extra information to the consumer (such as total number + // of rows, context info, or otherwise). + // + // Return value: 0 if successful, `errno`-compatible error otherwise Review Comment: Callbacks returning an error like this is pretty common in Rust. Normally what the producer does is: * Abort the reader * Return an error "External error: the producer received error code <X> from a call to `on_schema`: <errno-description>" I think it's nice to be able to return an error code here. For example, maybe the consumer can't handle one of the data types reported by the schema. Either way though, if an error is returned or if the consumer explicitly cancels in some way, I assume the producer needs to keep accepting calls to the callbacks right? (e.g. returning an error here won't prevent `on_next` from being called once or twice while the producer is cleaning up) ########## cpp/src/arrow/c/abi.h: ########## @@ -228,6 +228,65 @@ struct ArrowDeviceArrayStream { #endif // ARROW_C_DEVICE_STREAM_INTERFACE +#ifndef ARROW_C_ASYNC_STREAM_INTERFACE +#define ARROW_C_ASYNC_STREAM_INTERFACE + +// Similar to ArrowDeviceArrayStream, except designed for an asynchronous +// style of interaction. While ArrowDeviceArrayStream provides producer +// defined callbacks, this is intended to be created by the consumer instead. +// The consumer passes this handler to the producer, which in turn uses the +// callbacks to inform the consumer of events in the stream. +struct ArrowAsyncDeviceStreamHandler { + // Handler for receiving a schema. The passed in stream_schema should be + // released or moved by the handler (producer is giving ownership of it to + // the handler). + // + // The `extension_param` argument can be null or can be used by a producer Review Comment: I read this as "the contents of `stream_schema` are only valid during the call. Copy off any information that you need. The `ArrowSchema` will be deleted by the producer at some point after the call completes." Or, to put it another way "the consumer shall not call `release` on the `ArrowSchema`" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org