This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new d10a859 feat(arrow/cdata): Add Implementation of Async C Data
interface (#169)
d10a859 is described below
commit d10a8591351fd113fe7ae90c9b56e80ebdfd196d
Author: Matt Topol <[email protected]>
AuthorDate: Tue Nov 12 05:48:17 2024 +0100
feat(arrow/cdata): Add Implementation of Async C Data interface (#169)
This adds a basic implementation of helpers for managing an
ArrowAsyncDeviceStreamHandler for using the Async Arrow C Device
interface. The corresponding C++ helper implementation can be found at
https://github.com/apache/arrow/pull/44495 with the discusson on the
actual C structures located at
https://github.com/apache/arrow/pull/43632.
---------
Co-authored-by: Sutou Kouhei <[email protected]>
---
arrow/cdata/arrow/c/abi.h | 341 +++++++++++++++++++++++++++++++++++-
arrow/cdata/arrow/c/helpers.h | 91 ++++++++--
arrow/cdata/cdata.go | 5 +
arrow/cdata/cdata_exports.go | 80 +++++++++
arrow/cdata/cdata_test.go | 108 ++++++++++++
arrow/cdata/cdata_test_framework.go | 15 ++
arrow/cdata/exports.go | 262 +++++++++++++++++++++++++++
arrow/cdata/interface.go | 60 +++++++
arrow/cdata/trampoline.c | 6 +
9 files changed, 948 insertions(+), 20 deletions(-)
diff --git a/arrow/cdata/arrow/c/abi.h b/arrow/cdata/arrow/c/abi.h
index d58417e..7f4d565 100644
--- a/arrow/cdata/arrow/c/abi.h
+++ b/arrow/cdata/arrow/c/abi.h
@@ -15,20 +15,37 @@
// specific language governing permissions and limitations
// under the License.
+/// \file abi.h Arrow C Data Interface
+///
+/// The Arrow C Data interface defines a very small, stable set
+/// of C definitions which can be easily copied into any project's
+/// source code and vendored to be used for columnar data interchange
+/// in the Arrow format. For non-C/C++ languages and runtimes,
+/// it should be almost as easy to translate the C definitions into
+/// the corresponding C FFI declarations.
+///
+/// Applications and libraries can therefore work with Arrow memory
+/// without necessarily using the Arrow libraries or reinventing
+/// the wheel. Developers can choose between tight integration
+/// with the Arrow software project or minimal integration with
+/// the Arrow format only.
+
#pragma once
#include <stdint.h>
+// Spec and documentation:
https://arrow.apache.org/docs/format/CDataInterface.html
+
#ifdef __cplusplus
extern "C" {
#endif
#ifndef ARROW_C_DATA_INTERFACE
-#define ARROW_C_DATA_INTERFACE
+# define ARROW_C_DATA_INTERFACE
-#define ARROW_FLAG_DICTIONARY_ORDERED 1
-#define ARROW_FLAG_NULLABLE 2
-#define ARROW_FLAG_MAP_KEYS_SORTED 4
+# define ARROW_FLAG_DICTIONARY_ORDERED 1
+# define ARROW_FLAG_NULLABLE 2
+# define ARROW_FLAG_MAP_KEYS_SORTED 4
struct ArrowSchema {
// Array type description
@@ -65,8 +82,63 @@ struct ArrowArray {
#endif // ARROW_C_DATA_INTERFACE
+#ifndef ARROW_C_DEVICE_DATA_INTERFACE
+# define ARROW_C_DEVICE_DATA_INTERFACE
+
+// Spec and Documentation:
https://arrow.apache.org/docs/format/CDeviceDataInterface.html
+
+// DeviceType for the allocated memory
+typedef int32_t ArrowDeviceType;
+
+// CPU device, same as using ArrowArray directly
+# define ARROW_DEVICE_CPU 1
+// CUDA GPU Device
+# define ARROW_DEVICE_CUDA 2
+// Pinned CUDA CPU memory by cudaMallocHost
+# define ARROW_DEVICE_CUDA_HOST 3
+// OpenCL Device
+# define ARROW_DEVICE_OPENCL 4
+// Vulkan buffer for next-gen graphics
+# define ARROW_DEVICE_VULKAN 7
+// Metal for Apple GPU
+# define ARROW_DEVICE_METAL 8
+// Verilog simulator buffer
+# define ARROW_DEVICE_VPI 9
+// ROCm GPUs for AMD GPUs
+# define ARROW_DEVICE_ROCM 10
+// Pinned ROCm CPU memory allocated by hipMallocHost
+# define ARROW_DEVICE_ROCM_HOST 11
+// Reserved for extension
+# define ARROW_DEVICE_EXT_DEV 12
+// CUDA managed/unified memory allocated by cudaMallocManaged
+# define ARROW_DEVICE_CUDA_MANAGED 13
+// unified shared memory allocated on a oneAPI non-partitioned device.
+# define ARROW_DEVICE_ONEAPI 14
+// GPU support for next-gen WebGPU standard
+# define ARROW_DEVICE_WEBGPU 15
+// Qualcomm Hexagon DSP
+# define ARROW_DEVICE_HEXAGON 16
+
+struct ArrowDeviceArray {
+ // the Allocated Array
+ //
+ // the buffers in the array (along with the buffers of any
+ // children) are what is allocated on the device.
+ struct ArrowArray array;
+ // The device id to identify a specific device
+ int64_t device_id;
+ // The type of device which can access this memory.
+ ArrowDeviceType device_type;
+ // An event-like object to synchronize on if needed.
+ void* sync_event;
+ // Reserved bytes for future expansion.
+ int64_t reserved[3];
+};
+
+#endif // ARROW_C_DEVICE_DATA_INTERFACE
+
#ifndef ARROW_C_STREAM_INTERFACE
-#define ARROW_C_STREAM_INTERFACE
+# define ARROW_C_STREAM_INTERFACE
struct ArrowArrayStream {
// Callback to get the stream type
@@ -106,6 +178,265 @@ struct ArrowArrayStream {
#endif // ARROW_C_STREAM_INTERFACE
+#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
+# define ARROW_C_DEVICE_STREAM_INTERFACE
+
+// Equivalent to ArrowArrayStream, but for ArrowDeviceArrays.
+//
+// This stream is intended to provide a stream of data on a single
+// device, if a producer wants data to be produced on multiple devices
+// then multiple streams should be provided. One per device.
+struct ArrowDeviceArrayStream {
+ // The device that this stream produces data on.
+ ArrowDeviceType device_type;
+
+ // Callback to get the stream schema
+ // (will be the same for all arrays in the stream).
+ //
+ // Return value 0 if successful, an `errno`-compatible error code otherwise.
+ //
+ // If successful, the ArrowSchema must be released independently from the
stream.
+ // The schema should be accessible via CPU memory.
+ int (*get_schema)(struct ArrowDeviceArrayStream* self, struct ArrowSchema*
out);
+
+ // Callback to get the next array
+ // (if no error and the array is released, the stream has ended)
+ //
+ // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+ //
+ // If successful, the ArrowDeviceArray must be released independently from
the stream.
+ int (*get_next)(struct ArrowDeviceArrayStream* self, struct
ArrowDeviceArray* out);
+
+ // Callback to get optional detailed error information.
+ // This must only be called if the last stream operation failed
+ // with a non-0 return code.
+ //
+ // Return value: pointer to a null-terminated character array describing
+ // the last error, or NULL if no description is available.
+ //
+ // The returned pointer is only valid until the next operation on this stream
+ // (including release).
+ const char* (*get_last_error)(struct ArrowDeviceArrayStream* self);
+
+ // Release callback: release the stream's own resources.
+ // Note that arrays returned by `get_next` must be individually released.
+ void (*release)(struct ArrowDeviceArrayStream* self);
+
+ // Opaque producer-specific data
+ void* private_data;
+};
+
+#endif // ARROW_C_DEVICE_STREAM_INTERFACE
+
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+# define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that
was passed
+// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+//
+// The reason for this Task approach instead of the Async interface returning
+// the Array directly is to allow for more complex thread handling and reducing
+// context switching and data transfers between CPU cores (e.g. from one L1/L2
+// cache to another) if desired.
+//
+// For example, the `on_next_task` callback can be called when data is ready,
while
+// the producer puts potential "decoding" logic in the `ArrowAsyncTask`
object. This
+// allows for the producer to manage the I/O on one thread which calls
`on_next_task`
+// and the consumer can determine when the decoding (producer logic in the
`extract_data`
+// callback of the task) occurs and on which thread, to avoid a CPU core
transfer
+// (data staying in the L2 cache).
+struct ArrowAsyncTask {
+ // This callback should populate the ArrowDeviceArray associated with this
task.
+ // The order of ArrowAsyncTasks provided by the producer enables a consumer
to
+ // ensure the order of data to process.
+ //
+ // This function is expected to be synchronous, but should not perform any
blocking
+ // I/O. Ideally it should be as cheap as possible so as to not tie up the
consumer
+ // thread unnecessarily.
+ //
+ // Returns: 0 if successful, errno-compatible error otherwise.
+ //
+ // If a non-0 value is returned then it should be followed by a call to
`on_error`
+ // on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's
highly
+ // likely that whatever is calling this function may be entirely
disconnected from
+ // the current control flow. Indicating an error here with a non-zero return
allows
+ // the current flow to be aware of the error occurring, while still allowing
any
+ // logging or error handling to still be centralized in the `on_error`
callback of
+ // the original Async handler.
+ //
+ // Rather than a release callback, any required cleanup should be performed
as part
+ // of the invocation of `extract_data`. Ownership of the Array is passed to
the consumer
+ // calling this, and so it must be released separately.
+ //
+ // It is only valid to call this method exactly once.
+ int (*extract_data)(struct ArrowAsyncTask* self, struct ArrowDeviceArray*
out);
+
+ // opaque task-specific data
+ void* private_data;
+};
+
+// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between
an async
+// producer and consumer. This object allows the consumer to perform
backpressure and flow
+// control on the asynchronous stream processing. This object must be owned by
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+ // The device type that this stream produces data on.
+ ArrowDeviceType device_type;
+
+ // A consumer must call this function to start receiving on_next_task calls.
+ //
+ // It *must* be valid to call this synchronously from within `on_next_task`
or
+ // `on_schema`, but this function *must not* immediately call `on_next_task`
so as
+ // to avoid recursion and reentrant callbacks.
+ //
+ // After cancel has been called, additional calls to this function must be
NOPs,
+ // but allowed. While not cancelled, calling this function must register the
+ // given number of additional arrays/batches to be produced with the
producer.
+ // The producer should only call `on_next_task` at most the registered number
+ // of arrays before propagating backpressure.
+ //
+ // Any error encountered by calling request must be propagated by calling
the `on_error`
+ // callback of the ArrowAsyncDeviceStreamHandler.
+ //
+ // While not cancelled, any subsequent calls to `on_next_task`, `on_error` or
+ // `release` should be scheduled by the producer to be called later.
+ //
+ // It is invalid for a consumer to call this with a value of n <= 0,
producers should
+ // error if given such a value.
+ void (*request)(struct ArrowAsyncProducer* self, int64_t n);
+
+ // This cancel callback signals a producer that it must eventually stop
making calls
+ // to on_next_task. It must be idempotent and thread-safe. After calling
cancel once,
+ // subsequent calls must be NOPs. This must not call any consumer-side
handlers other
+ // than `on_error`.
+ //
+ // It is not required that calling cancel affect the producer immediately,
only that it
+ // must eventually stop calling on_next_task and subsequently call release
on the
+ // async handler. As such, a consumer must be prepared to receive one or
more calls to
+ // `on_next_task` even after calling cancel if there are still requested
arrays pending.
+ //
+ // Successful cancellation should *not* result in the producer calling
`on_error`, it
+ // should finish out any remaining tasks and eventually call `release`.
+ //
+ // Any error encountered during handling a call to cancel must be reported
via the
+ // on_error callback on the async stream handler.
+ void (*cancel)(struct ArrowAsyncProducer* self);
+
+ // Any additional metadata tied to a specific stream of data. This must
either be NULL
+ // or a valid pointer to metadata which is encoded in the same way schema
metadata
+ // would be. Non-null metadata must be valid for the lifetime of this
object. As an
+ // example a producer could use this to provide the total number of rows
and/or batches
+ // in the stream if known.
+ const char* additional_metadata;
+
+ // producer-specific opaque data.
+ void* private_data;
+};
+
+// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an
asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+ // Handler for receiving a schema. The passed in stream_schema must be
+ // released or moved by the handler (producer is giving ownership of the
schema to
+ // the handler, but not ownership of the top level object itself).
+ //
+ // With the exception of an error occurring (on_error), this must be the
first
+ // callback function which is called by a producer and must only be called
exactly
+ // once. As such, the producer should provide a valid ArrowAsyncProducer
instance
+ // so the consumer can control the flow. See the documentation on
ArrowAsyncProducer
+ // for how it works. The ArrowAsyncProducer is owned by the producer who
calls this
+ // function and thus the producer is responsible for cleaning it up when
calling
+ // the release callback of this handler.
+ //
+ // If there is any additional metadata tied to this stream, it will be
provided as
+ // a non-null value for the `additional_metadata` field of the
ArrowAsyncProducer
+ // which will be valid at least until the release callback is called.
+ //
+ // Return value: 0 if successful, `errno`-compatible error otherwise
+ //
+ // A producer that receives a non-zero return here should stop producing and
eventually
+ // call release instead.
+ int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+ struct ArrowSchema* stream_schema);
+
+ // Handler for receiving data. This is called when data is available
providing an
+ // ArrowAsyncTask struct to signify it. The producer indicates the end of
the stream
+ // by passing NULL as the value for the task rather than a valid pointer to
a task.
+ // The task object is only valid for the lifetime of this function call, if
a consumer
+ // wants to utilize it after this function returns, it must copy or move the
contents
+ // of it to a new ArrowAsyncTask object.
+ //
+ // The `request` callback of a provided ArrowAsyncProducer must be called in
order
+ // to start receiving calls to this handler.
+ //
+ // The metadata argument can be null or can be used by a producer
+ // to pass arbitrary extra information to the consumer (such as total number
+ // of rows, context info, or otherwise). The data should be passed using the
same
+ // encoding as the metadata within the ArrowSchema struct itself (defined in
+ // the spec at
+ //
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata)
+ //
+ // If metadata is non-null then it only needs to exist for the lifetime of
this call,
+ // a consumer who wants it to live after that must copy it to ensure
lifetime.
+ //
+ // A producer *must not* call this concurrently from multiple different
threads.
+ //
+ // A consumer must be prepared to receive one or more calls to this callback
even
+ // after calling cancel on the corresponding ArrowAsyncProducer, as cancel
does not
+ // guarantee it happens immediately.
+ //
+ // Return value: 0 if successful, `errno`-compatible error otherwise.
+ //
+ // If the consumer returns a non-zero return from this method, that
indicates to the
+ // producer that it should stop propagating data as an error occurred. After
receiving
+ // such a return, the only interaction with this object is for the producer
to call
+ // the `release` callback.
+ int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
+ struct ArrowAsyncTask* task, const char* metadata);
+
+ // Handler for encountering an error. The producer should call release after
+ // this returns to clean up any resources. The `code` passed in can be any
error
+ // code that a producer wants, but should be errno-compatible for
consistency.
+ //
+ // If the message or metadata are non-null, they will only last as long as
this
+ // function call. The consumer would need to perform a copy of the data if
it is
+ // necessary for them to live past the lifetime of this call.
+ //
+ // Error metadata should be encoded as with metadata in ArrowSchema, defined
in
+ // the spec at
+ //
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata
+ //
+ // It is valid for this to be called by a producer with or without a
preceding call
+ // to ArrowAsyncProducer.request.
+ //
+ // This callback must not call any methods of an ArrowAsyncProducer object.
+ void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code,
+ const char* message, const char* metadata);
+
+ // Release callback to release any resources for the handler. Should always
be
+ // called by a producer when it is done utilizing a handler. No callbacks
should
+ // be called after this is called.
+ //
+ // It is valid for the release callback to be called by a producer with or
without
+ // a preceding call to ArrowAsyncProducer.request.
+ //
+ // The release callback must not call any methods of an ArrowAsyncProducer
object.
+ void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
+
+ // MUST be populated by the producer BEFORE calling any callbacks other than
release.
+ // This provides the connection between a handler and its producer, and must
exist until
+ // the release callback is called.
+ struct ArrowAsyncProducer* producer;
+
+ // Opaque handler-specific data
+ void* private_data;
+};
+
+#endif // ARROW_C_ASYNC_STREAM_INTERFACE
+
#ifdef __cplusplus
}
#endif
diff --git a/arrow/cdata/arrow/c/helpers.h b/arrow/cdata/arrow/c/helpers.h
index 6581403..6e4df17 100644
--- a/arrow/cdata/arrow/c/helpers.h
+++ b/arrow/cdata/arrow/c/helpers.h
@@ -18,21 +18,31 @@
#pragma once
#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
#include "arrow/c/abi.h"
+#define ARROW_C_ASSERT(condition, msg) \
+ do { \
+ if (!(condition)) { \
+ fprintf(stderr, "%s:%d:: %s", __FILE__, __LINE__, (msg)); \
+ abort(); \
+ } \
+ } while (0)
+
#ifdef __cplusplus
extern "C" {
#endif
/// Query whether the C schema is released
-static inline int ArrowSchemaIsReleased(const struct ArrowSchema* schema) {
+inline int ArrowSchemaIsReleased(const struct ArrowSchema* schema) {
return schema->release == NULL;
}
/// Mark the C schema released (for use in release callbacks)
-static inline void ArrowSchemaMarkReleased(struct ArrowSchema* schema) {
+inline void ArrowSchemaMarkReleased(struct ArrowSchema* schema) {
schema->release = NULL;
}
@@ -40,7 +50,7 @@ static inline void ArrowSchemaMarkReleased(struct
ArrowSchema* schema) {
///
/// Note `dest` must *not* point to a valid schema already, otherwise there
/// will be a memory leak.
-static inline void ArrowSchemaMove(struct ArrowSchema* src, struct
ArrowSchema* dest) {
+inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dest)
{
assert(dest != src);
assert(!ArrowSchemaIsReleased(src));
memcpy(dest, src, sizeof(struct ArrowSchema));
@@ -48,47 +58,81 @@ static inline void ArrowSchemaMove(struct ArrowSchema* src,
struct ArrowSchema*
}
/// Release the C schema, if necessary, by calling its release callback
-static inline void ArrowSchemaRelease(struct ArrowSchema* schema) {
+inline void ArrowSchemaRelease(struct ArrowSchema* schema) {
if (!ArrowSchemaIsReleased(schema)) {
schema->release(schema);
- assert(ArrowSchemaIsReleased(schema));
+ ARROW_C_ASSERT(ArrowSchemaIsReleased(schema),
+ "ArrowSchemaRelease did not cleanup release callback");
}
}
/// Query whether the C array is released
-static inline int ArrowArrayIsReleased(const struct ArrowArray* array) {
+inline int ArrowArrayIsReleased(const struct ArrowArray* array) {
return array->release == NULL;
}
+inline int ArrowDeviceArrayIsReleased(const struct ArrowDeviceArray* array) {
+ return ArrowArrayIsReleased(&array->array);
+}
+
/// Mark the C array released (for use in release callbacks)
-static inline void ArrowArrayMarkReleased(struct ArrowArray* array) {
array->release = NULL; }
+inline void ArrowArrayMarkReleased(struct ArrowArray* array) { array->release
= NULL; }
+
+inline void ArrowDeviceArrayMarkReleased(struct ArrowDeviceArray* array) {
+ ArrowArrayMarkReleased(&array->array);
+}
/// Move the C array from `src` to `dest`
///
/// Note `dest` must *not* point to a valid array already, otherwise there
/// will be a memory leak.
-static inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray*
dest) {
+inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray* dest) {
assert(dest != src);
assert(!ArrowArrayIsReleased(src));
memcpy(dest, src, sizeof(struct ArrowArray));
ArrowArrayMarkReleased(src);
}
+inline void ArrowDeviceArrayMove(struct ArrowDeviceArray* src,
+ struct ArrowDeviceArray* dest) {
+ assert(dest != src);
+ assert(!ArrowDeviceArrayIsReleased(src));
+ memcpy(dest, src, sizeof(struct ArrowDeviceArray));
+ ArrowDeviceArrayMarkReleased(src);
+}
+
/// Release the C array, if necessary, by calling its release callback
-static inline void ArrowArrayRelease(struct ArrowArray* array) {
+inline void ArrowArrayRelease(struct ArrowArray* array) {
if (!ArrowArrayIsReleased(array)) {
array->release(array);
- assert(ArrowArrayIsReleased(array));
+ ARROW_C_ASSERT(ArrowArrayIsReleased(array),
+ "ArrowArrayRelease did not cleanup release callback");
+ }
+}
+
+inline void ArrowDeviceArrayRelease(struct ArrowDeviceArray* array) {
+ if (!ArrowDeviceArrayIsReleased(array)) {
+ array->array.release(&array->array);
+ ARROW_C_ASSERT(ArrowDeviceArrayIsReleased(array),
+ "ArrowDeviceArrayRelease did not cleanup release callback");
}
}
/// Query whether the C array stream is released
-static inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream*
stream) {
+inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* stream) {
+ return stream->release == NULL;
+}
+
+inline int ArrowDeviceArrayStreamIsReleased(const struct
ArrowDeviceArrayStream* stream) {
return stream->release == NULL;
}
/// Mark the C array stream released (for use in release callbacks)
-static inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream*
stream) {
+inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) {
+ stream->release = NULL;
+}
+
+inline void ArrowDeviceArrayStreamMarkReleased(struct ArrowDeviceArrayStream*
stream) {
stream->release = NULL;
}
@@ -96,7 +140,7 @@ static inline void ArrowArrayStreamMarkReleased(struct
ArrowArrayStream* stream)
///
/// Note `dest` must *not* point to a valid stream already, otherwise there
/// will be a memory leak.
-static inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
+inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
struct ArrowArrayStream* dest) {
assert(dest != src);
assert(!ArrowArrayStreamIsReleased(src));
@@ -104,11 +148,28 @@ static inline void ArrowArrayStreamMove(struct
ArrowArrayStream* src,
ArrowArrayStreamMarkReleased(src);
}
+inline void ArrowDeviceArrayStreamMove(struct ArrowDeviceArrayStream* src,
+ struct ArrowDeviceArrayStream* dest) {
+ assert(dest != src);
+ assert(!ArrowDeviceArrayStreamIsReleased(src));
+ memcpy(dest, src, sizeof(struct ArrowDeviceArrayStream));
+ ArrowDeviceArrayStreamMarkReleased(src);
+}
+
/// Release the C array stream, if necessary, by calling its release callback
-static inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
+inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
if (!ArrowArrayStreamIsReleased(stream)) {
stream->release(stream);
- assert(ArrowArrayStreamIsReleased(stream));
+ ARROW_C_ASSERT(ArrowArrayStreamIsReleased(stream),
+ "ArrowArrayStreamRelease did not cleanup release callback");
+ }
+}
+
+inline void ArrowDeviceArrayStreamRelease(struct ArrowDeviceArrayStream*
stream) {
+ if (!ArrowDeviceArrayStreamIsReleased(stream)) {
+ stream->release(stream);
+ ARROW_C_ASSERT(ArrowDeviceArrayStreamIsReleased(stream),
+ "ArrowDeviceArrayStreamRelease did not cleanup release
callback");
}
}
diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go
index d5748a3..4361e6f 100644
--- a/arrow/cdata/cdata.go
+++ b/arrow/cdata/cdata.go
@@ -64,6 +64,11 @@ type (
CArrowArray = C.struct_ArrowArray
// CArrowArrayStream is the C Stream Interface object for handling
streams of record batches.
CArrowArrayStream = C.struct_ArrowArrayStream
+
+ CArrowAsyncDeviceStreamHandler = C.struct_ArrowAsyncDeviceStreamHandler
+ CArrowAsyncProducer = C.struct_ArrowAsyncProducer
+ CArrowAsyncTask = C.struct_ArrowAsyncTask
+ CArrowDeviceArray = C.struct_ArrowDeviceArray
)
// Map from the defined strings to their corresponding arrow.DataType interface
diff --git a/arrow/cdata/cdata_exports.go b/arrow/cdata/cdata_exports.go
index 6d1e038..da7db6c 100644
--- a/arrow/cdata/cdata_exports.go
+++ b/arrow/cdata/cdata_exports.go
@@ -33,10 +33,23 @@ package cdata
// void goReleaseSchema(struct ArrowSchema* schema) {
// releaseExportedSchema(schema);
// }
+//
+// void goCallCancel(struct ArrowAsyncProducer* producer) {
+// producer->cancel(producer);
+// }
+//
+// int goExtractTaskData(struct ArrowAsyncTask* task, struct ArrowDeviceArray*
out) {
+// return task->extract_data(task, out);
+// }
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+// producer->request(producer, n);
+// }
import "C"
import (
"bytes"
+ "context"
"encoding/binary"
"fmt"
"runtime/cgo"
@@ -489,3 +502,70 @@ func (rr cRecordReader) release() {
}
rr.rdr.Release()
}
+
+type cAsyncStreamHandler struct {
+ producer *CArrowAsyncProducer
+ taskQueue chan taskState
+ ctx context.Context
+}
+
+func asyncTaskQueue(ctx context.Context, schema *arrow.Schema, recordStream
chan<- RecordMessage, taskQueue <-chan taskState, producer
*CArrowAsyncProducer) {
+ defer close(recordStream)
+ for {
+ select {
+ case <-ctx.Done():
+ C.goCallCancel(producer)
+ return
+ case task, ok := <-taskQueue:
+ // if the queue closes or we receive a nil task, we're
done
+ if !ok || (task.err == nil && task.task.extract_data ==
nil) {
+ return
+ }
+
+ if task.err != nil {
+ recordStream <- RecordMessage{Err: task.err}
+ continue
+ }
+
+ // request another batch now that we've processed this
one
+ C.goCallRequest(producer, C.int64_t(1))
+
+ var out CArrowDeviceArray
+ if C.goExtractTaskData(&task.task, &out) != C.int(0) {
+ continue
+ }
+
+ rec, err := ImportCRecordBatchWithSchema(&out.array,
schema)
+ if err != nil {
+ recordStream <- RecordMessage{Err: err}
+ } else {
+ recordStream <- RecordMessage{Record: rec,
AdditionalMetadata: task.meta}
+ }
+ }
+ }
+}
+
+func (h *cAsyncStreamHandler) onNextTask(task *CArrowAsyncTask, metadata
*C.char) C.int {
+ if task == nil {
+ h.taskQueue <- taskState{}
+ return 0
+ }
+
+ ts := taskState{task: *task}
+ if metadata != nil {
+ ts.meta = decodeCMetadata(metadata)
+ }
+ h.taskQueue <- ts
+ return 0
+}
+
+func (h *cAsyncStreamHandler) onError(code C.int, message, metadata *C.char) {
+ h.taskQueue <- taskState{err: AsyncStreamError{
+ Code: int(code), Msg: C.GoString(message), Metadata:
C.GoString(metadata)}}
+}
+
+func (h *cAsyncStreamHandler) release() {
+ close(h.taskQueue)
+ h.taskQueue, h.producer = nil, nil
+ h.producer = nil
+}
diff --git a/arrow/cdata/cdata_test.go b/arrow/cdata/cdata_test.go
index ebd4fcf..bd1942f 100644
--- a/arrow/cdata/cdata_test.go
+++ b/arrow/cdata/cdata_test.go
@@ -24,6 +24,7 @@
package cdata
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -42,6 +43,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/memory/mallocator"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestSchemaExport(t *testing.T) {
@@ -1050,3 +1052,109 @@ func TestConfuseGoGc(t *testing.T) {
wg.Wait()
}
+
+func TestAsyncInterfacesSimple(t *testing.T) {
+ reclist := arrdata.Records["primitives"]
+
+ handler := testAsyncHandler()
+ defer freeAsyncHandler(handler)
+
+ ctx := context.Background()
+ ch := CreateAsyncDeviceStreamHandler(ctx, 1, handler)
+
+ stream := make(chan RecordMessage, len(reclist))
+ go func() {
+ defer close(stream)
+ for _, r := range reclist {
+ r.Retain()
+ stream <- RecordMessage{Record: r}
+ }
+ }()
+
+ wait := make(chan struct{})
+ go func() {
+ defer close(wait)
+ assert.NoError(t,
ExportAsyncRecordBatchStream(reclist[0].Schema(), stream, handler))
+ }()
+
+ asyncStream := <-ch
+ require.NoError(t, asyncStream.Err)
+
+ assert.True(t, reclist[0].Schema().Equal(asyncStream.Schema))
+ var idx int
+ for r := range asyncStream.Stream {
+ require.NoError(t, r.Err)
+ assert.True(t, array.RecordEqual(reclist[idx], r.Record))
+ idx++
+ r.Record.Release()
+ }
+
+ <-wait
+}
+
+func TestAsyncSchemaError(t *testing.T) {
+ handler := testAsyncHandler()
+ defer freeAsyncHandler(handler)
+
+ ctx := context.Background()
+ ch := CreateAsyncDeviceStreamHandler(ctx, 1, handler)
+
+ wait := make(chan struct{})
+ go func() {
+ defer close(wait)
+ err := ExportAsyncRecordBatchStream(nil, nil, handler)
+ assert.ErrorIs(t, err, arrow.ErrInvalid)
+ assert.ErrorContains(t, err, "must have non-nil schema")
+ }()
+
+ asyncStream := <-ch
+ require.Error(t, asyncStream.Err)
+
+ var asyncErr AsyncStreamError
+ assert.ErrorAs(t, asyncStream.Err, &asyncErr)
+ assert.Equal(t, 22, int(asyncErr.Code))
+
+ <-wait
+}
+
+func TestAsyncPropagateError(t *testing.T) {
+ reclist := arrdata.Records["primitives"]
+
+ handler := testAsyncHandler()
+ defer freeAsyncHandler(handler)
+
+ ctx := context.Background()
+ ch := CreateAsyncDeviceStreamHandler(ctx, 1, handler)
+
+ stream := make(chan RecordMessage, 2)
+ go func() {
+ defer close(stream)
+ reclist[0].Retain()
+ stream <- RecordMessage{Record: reclist[0]}
+ stream <- RecordMessage{Err: assert.AnError}
+ }()
+
+ wait := make(chan struct{})
+ go func() {
+ defer close(wait)
+ err := ExportAsyncRecordBatchStream(reclist[0].Schema(),
stream, handler)
+ assert.ErrorIs(t, err, assert.AnError)
+ }()
+
+ asyncStream := <-ch
+ require.NoError(t, asyncStream.Err)
+
+ assert.True(t, reclist[0].Schema().Equal(asyncStream.Schema))
+ rec1 := <-asyncStream.Stream
+ require.NoError(t, rec1.Err)
+ assert.True(t, array.RecordEqual(reclist[0], rec1.Record))
+ rec1.Record.Release()
+
+ rec2 := <-asyncStream.Stream
+ var err AsyncStreamError
+ assert.ErrorContains(t, rec2.Err, assert.AnError.Error())
+ assert.ErrorAs(t, rec2.Err, &err)
+ assert.Equal(t, 22, int(err.Code))
+
+ <-wait
+}
diff --git a/arrow/cdata/cdata_test_framework.go
b/arrow/cdata/cdata_test_framework.go
index 331e80b..ad6b911 100644
--- a/arrow/cdata/cdata_test_framework.go
+++ b/arrow/cdata/cdata_test_framework.go
@@ -37,6 +37,13 @@ package cdata
// return out;
// }
//
+// static struct ArrowAsyncDeviceStreamHandler* get_test_async_handler() {
+// struct ArrowAsyncDeviceStreamHandler* handler =
+// (struct ArrowAsyncDeviceStreamHandler*)malloc(sizeof(struct
ArrowAsyncDeviceStreamHandler));
+// memset(handler, 0, sizeof(*handler));
+// return handler;
+// }
+//
// void release_test_arr(struct ArrowArray* arr);
//
// static int32_t* get_data() {
@@ -132,6 +139,14 @@ func freeMallocedSchemas(schemas **CArrowSchema) {
C.free_malloced_schemas(schemas)
}
+func testAsyncHandler() *CArrowAsyncDeviceStreamHandler {
+ return C.get_test_async_handler()
+}
+
+func freeAsyncHandler(h *CArrowAsyncDeviceStreamHandler) {
+ C.free(unsafe.Pointer(h))
+}
+
func testNested(fmts, names []string, isnull []bool) **CArrowSchema {
if len(fmts) != len(names) {
panic("testing nested lists must have same size fmts and names")
diff --git a/arrow/cdata/exports.go b/arrow/cdata/exports.go
index bf64ea7..d70305c 100644
--- a/arrow/cdata/exports.go
+++ b/arrow/cdata/exports.go
@@ -17,6 +17,8 @@
package cdata
import (
+ "context"
+ "fmt"
"runtime/cgo"
"unsafe"
@@ -25,6 +27,8 @@ import (
)
// #include <stdlib.h>
+// #include <errno.h>
+// #include "arrow/c/abi.h"
// #include "arrow/c/helpers.h"
//
// typedef const char cchar_t;
@@ -32,9 +36,40 @@ import (
// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
// extern const char* streamGetError(struct ArrowArrayStream*);
// extern void streamRelease(struct ArrowArrayStream*);
+// extern int asyncStreamOnSchema(struct ArrowAsyncDeviceStreamHandler*,
struct ArrowSchema*);
+// extern int asyncStreamOnNextTask(struct ArrowAsyncDeviceStreamHandler*,
struct ArrowAsyncTask*, char*);
+// extern void asyncStreamOnError(struct ArrowAsyncDeviceStreamHandler*, int,
char*, char*);
+// extern void asyncStreamRelease(struct ArrowAsyncDeviceStreamHandler*);
+// extern void asyncProducerRequest(struct ArrowAsyncProducer*, int64_t);
+// extern void asyncProducerCancel(struct ArrowAsyncProducer*);
+// extern int asyncTaskExtract(struct ArrowAsyncTask*, struct
ArrowDeviceArray*);
// // XXX(https://github.com/apache/arrow-adbc/issues/729)
// int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct
ArrowSchema* out);
// int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct
ArrowArray* out);
+// int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct
ArrowDeviceArray* out);
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+// producer->request(producer, n);
+// }
+// static int goCallOnSchema(struct ArrowAsyncDeviceStreamHandler* handler,
struct ArrowSchema* schema) {
+// return handler->on_schema(handler, schema);
+// }
+// static void goCallOnError(struct ArrowAsyncDeviceStreamHandler* handler,
int code, char* message, char* metadata) {
+// handler->on_error(handler, code, message, metadata);
+// }
+// static int goCallOnNextTask(struct ArrowAsyncDeviceStreamHandler* handler,
struct ArrowAsyncTask* task, char* metadata) {
+// return handler->on_next_task(handler, task, metadata);
+// }
+//
+// static struct ArrowAsyncProducer* get_producer() {
+// struct ArrowAsyncProducer* out = (struct
ArrowAsyncProducer*)malloc(sizeof(struct ArrowAsyncProducer));
+// memset(out, 0, sizeof(struct ArrowAsyncProducer));
+// return out;
+// }
+//
+// static void goReleaseAsyncHandler(struct ArrowAsyncDeviceStreamHandler*
handler) {
+// handler->release(handler);
+// }
//
import "C"
@@ -155,3 +190,230 @@ func exportStream(rdr array.RecordReader, out
*CArrowArrayStream) {
h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
out.private_data = createHandle(h)
}
+
+type cAsyncState struct {
+ ch chan AsyncRecordBatchStream
+ queueSize uint64
+ ctx context.Context
+}
+
+type taskState struct {
+ task CArrowAsyncTask
+ meta arrow.Metadata
+ err error
+}
+
+//export asyncStreamOnSchema
+func asyncStreamOnSchema(self *CArrowAsyncDeviceStreamHandler, schema
*CArrowSchema) C.int {
+ h := getHandle(self.private_data)
+ handler := h.Value().(cAsyncState)
+ defer close(handler.ch)
+
+ if self.producer.device_type != C.ARROW_DEVICE_CPU {
+ handler.ch <- AsyncRecordBatchStream{Err:
fmt.Errorf("unsupported device type")}
+ return C.EINVAL
+ }
+
+ sc, err := ImportCArrowSchema(schema)
+ if err != nil {
+ handler.ch <- AsyncRecordBatchStream{Err: err}
+ return C.EINVAL
+ }
+
+ var meta arrow.Metadata
+ if self.producer.additional_metadata != nil {
+ meta = decodeCMetadata(self.producer.additional_metadata)
+ }
+
+ recordStream := make(chan RecordMessage, handler.queueSize)
+ taskQueue := make(chan taskState, handler.queueSize)
+ handler.ch <- AsyncRecordBatchStream{Schema: sc,
+ AdditionalMetadata: meta, Stream: recordStream}
+
+ self.private_data = createHandle(cgo.NewHandle(&cAsyncStreamHandler{
+ producer: self.producer,
+ ctx: handler.ctx,
+ taskQueue: taskQueue,
+ }))
+ defer h.Delete()
+
+ C.goCallRequest(self.producer, C.int64_t(handler.queueSize))
+ go asyncTaskQueue(handler.ctx, sc, recordStream, taskQueue,
self.producer)
+ return 0
+}
+
+//export asyncStreamOnNextTask
+func asyncStreamOnNextTask(self *CArrowAsyncDeviceStreamHandler, task
*CArrowAsyncTask, metadata *C.char) C.int {
+ h := getHandle(self.private_data)
+ handler := h.Value().(*cAsyncStreamHandler)
+ return handler.onNextTask(task, metadata)
+}
+
+//export asyncStreamOnError
+func asyncStreamOnError(self *CArrowAsyncDeviceStreamHandler, code C.int,
message, metadata *C.char) {
+ h := getHandle(self.private_data)
+ switch handler := h.Value().(type) {
+ case *cAsyncStreamHandler:
+ handler.onError(code, message, metadata)
+ case cAsyncState:
+ handler.ch <- AsyncRecordBatchStream{Err: AsyncStreamError{
+ Code: int(code),
+ Msg: C.GoString(message),
+ Metadata: C.GoString(metadata),
+ }}
+ close(handler.ch)
+ }
+}
+
+//export asyncStreamRelease
+func asyncStreamRelease(self *CArrowAsyncDeviceStreamHandler) {
+ h := getHandle(self.private_data)
+ if handler, ok := h.Value().(*cAsyncStreamHandler); ok {
+ handler.release()
+ }
+
+ h.Delete()
+ C.free(unsafe.Pointer(self.private_data))
+ self.release = nil
+ self.private_data = nil
+}
+
+func exportAsyncHandler(state cAsyncState, out
*CArrowAsyncDeviceStreamHandler) {
+ out.on_schema = (*[0]byte)(C.asyncStreamOnSchema)
+ out.on_next_task = (*[0]byte)(C.asyncStreamOnNextTask)
+ out.on_error = (*[0]byte)(C.asyncStreamOnError)
+ out.release = (*[0]byte)(C.asyncStreamRelease)
+ out.private_data = createHandle(cgo.NewHandle(state))
+}
+
+//export asyncProducerRequest
+func asyncProducerRequest(producer *CArrowAsyncProducer, n C.int64_t) {
+ h := getHandle(producer.private_data)
+ handler := h.Value().(*cAsyncProducer)
+ if handler.reqChan != nil {
+ handler.reqChan <- int64(n)
+ }
+}
+
+//export asyncProducerCancel
+func asyncProducerCancel(producer *CArrowAsyncProducer) {
+ h := getHandle(producer.private_data)
+ handler := h.Value().(*cAsyncProducer)
+ if handler.done != nil {
+ close(handler.done)
+ handler.done, handler.reqChan = nil, nil
+ }
+}
+
+//export asyncTaskExtract
+func asyncTaskExtract(task *CArrowAsyncTask, out *CArrowDeviceArray) C.int {
+ h := getHandle(task.private_data)
+ rec := h.Value().(arrow.Record)
+ defer rec.Release()
+
+ out.device_id, out.device_type = C.int64_t(-1), C.ARROW_DEVICE_CPU
+ ExportArrowRecordBatch(rec, &out.array, nil)
+ return C.int(0)
+}
+
+type cAsyncProducer struct {
+ reqChan chan int64
+ done chan error
+}
+
+func exportAsyncProducer(schema *arrow.Schema, stream <-chan RecordMessage,
handler *CArrowAsyncDeviceStreamHandler) error {
+ defer C.goReleaseAsyncHandler(handler)
+
+ if schema == nil {
+ err := fmt.Errorf("%w: must have non-nil schema",
arrow.ErrInvalid)
+ errmsg := C.CString(err.Error())
+ C.goCallOnError(handler, C.EINVAL, errmsg, nil)
+ C.free(unsafe.Pointer(errmsg))
+ return err
+ }
+
+ reqChan, done := make(chan int64, 5), make(chan error, 1)
+ prodHandle := cgo.NewHandle(&cAsyncProducer{reqChan: reqChan, done:
done})
+ cproducer := prodHandle.Value().(*cAsyncProducer)
+ defer func() {
+ close(reqChan)
+ cproducer.reqChan = nil
+ if cproducer.done != nil {
+ close(cproducer.done)
+ cproducer.done = nil
+ }
+
+ prodHandle.Delete()
+ }()
+
+ producer := C.get_producer()
+ defer C.free(unsafe.Pointer(producer))
+
+ producer.device_type = C.ARROW_DEVICE_CPU
+ producer.request = (*[0]byte)(C.asyncProducerRequest)
+ producer.cancel = (*[0]byte)(C.asyncProducerCancel)
+ producer.private_data = createHandle(prodHandle)
+ producer.additional_metadata = nil
+ handler.producer = producer
+
+ var s CArrowSchema
+ ExportArrowSchema(schema, &s)
+ if status := C.goCallOnSchema(handler, &s); status != C.int(0) {
+ releaseExportedSchema(&s)
+ return fmt.Errorf("on_schema failed with status %d", status)
+ }
+
+ var pending int64 = 0
+ for {
+ select {
+ case err, ok := <-done:
+ if !ok {
+ return nil
+ }
+
+ return err
+ case req := <-reqChan:
+ pending += req
+ default:
+ }
+
+ if pending > 0 {
+ select {
+ case msg, ok := <-stream:
+ if !ok {
+ if status :=
C.goCallOnNextTask(handler, nil, nil); status != C.int(0) {
+ return fmt.Errorf("on_next_task
with nil task failed with status %d", status)
+ }
+ return nil
+ }
+
+ pending--
+ if msg.Err != nil {
+ errmsg := C.CString(msg.Err.Error())
+ C.goCallOnError(handler, C.EINVAL,
errmsg, nil)
+ C.free(unsafe.Pointer(errmsg))
+ return msg.Err
+ }
+
+ var task CArrowAsyncTask
+ task.extract_data =
(*[0]byte)(C.asyncTaskExtractTrampoline)
+ task.private_data =
createHandle(cgo.NewHandle(msg.Record))
+
+ var encoded []byte
+ if msg.AdditionalMetadata.Len() != 0 {
+ encoded =
encodeCMetadata(msg.AdditionalMetadata.Keys(),
+ msg.AdditionalMetadata.Values())
+ }
+
+ status := C.goCallOnNextTask(handler, &task,
+
(*C.char)(unsafe.Pointer(unsafe.SliceData(encoded))))
+ if status != C.int(0) {
+ msg.Record.Release()
+ getHandle(task.private_data).Delete()
+ return fmt.Errorf("on_next_task failed
with status %d", status)
+ }
+ default:
+ }
+ }
+ }
+}
diff --git a/arrow/cdata/interface.go b/arrow/cdata/interface.go
index 93aedc2..fd6ecbf 100644
--- a/arrow/cdata/interface.go
+++ b/arrow/cdata/interface.go
@@ -20,6 +20,7 @@
package cdata
import (
+ "context"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
@@ -282,3 +283,62 @@ func ReleaseCArrowArray(arr *CArrowArray) {
releaseArr(arr) }
// ReleaseCArrowSchema calls ArrowSchemaRelease on the passed in cdata schema
func ReleaseCArrowSchema(schema *CArrowSchema) { releaseSchema(schema) }
+
+// RecordMessage is a simple container for a record batch channel to stream for
+// using the Async C Data Interface via ExportAsyncRecordBatchStream.
+type RecordMessage struct {
+ Record arrow.Record
+ AdditionalMetadata arrow.Metadata
+ Err error
+}
+
+// AsyncRecordBatchStream represents a stream of record batches being read in
+// from an ArrowAsyncDeviceStreamHandler's callbacks. If an error was
encountered
+// before the call to on_schema, then this will contain the error as Err.
Otherwise
+// the Schema will be valid and the Stream is a channel of RecordMessages being
+// propagated via on_next_task and extract_data.
+type AsyncRecordBatchStream struct {
+ Schema *arrow.Schema
+ AdditionalMetadata arrow.Metadata
+ Err error
+ Stream <-chan RecordMessage
+}
+
+// AsyncStreamError represents an error encountered via a call to the on_error
+// callback of an ArrowAsyncDeviceStreamHandler. The Code is the error code
that
+// should be errno compatible.
+type AsyncStreamError struct {
+ Code int
+ Msg string
+ Metadata string
+}
+
+func (e AsyncStreamError) Error() string { return e.Msg }
+
+// CreateAsyncDeviceStreamHandler populates a given
ArrowAsyncDeviceStreamHandler's callbacks
+// and waits for the on_schema callback to be called before passing the
AsyncRecordBatchStream
+// object across the returned channel.
+//
+// The provided queueSize is the number of records that will be requested at a
time to be passed
+// along the Stream in the returned AsyncRecordBatchStream. See the
documentation on
+// https://arrow.apache.org/docs/format/CDeviceDataInterface.html for more
information as to the
+// expected semantics of that size.
+//
+// The populated ArrowAsyncDeviceStreamHandler can then be given to any
compatible provider for
+// async record batch streams via the C Device interface.
+func CreateAsyncDeviceStreamHandler(ctx context.Context, queueSize uint64, out
*CArrowAsyncDeviceStreamHandler) <-chan AsyncRecordBatchStream {
+ ch := make(chan AsyncRecordBatchStream)
+ exportAsyncHandler(cAsyncState{ctx: ctx, ch: ch, queueSize: queueSize},
out)
+ return ch
+}
+
+// ExportAsyncRecordBatchStream takes in a schema and a channel of
RecordMessages along with a
+// ArrowAsyncDeviceStreamHandler to export the records as they come across the
channel and call
+// the appropriate callbacks on the handler. This function will block until
the stream is closed
+// or a message containing an error comes across the channel.
+//
+// The returned error will be nil if everything is successful, otherwise it
will be the error which
+// is encountered on the stream or an AsyncError if one of the handler
callbacks returns an error.
+func ExportAsyncRecordBatchStream(schema *arrow.Schema, stream <-chan
RecordMessage, handler *CArrowAsyncDeviceStreamHandler) error {
+ return exportAsyncProducer(schema, stream, handler)
+}
diff --git a/arrow/cdata/trampoline.c b/arrow/cdata/trampoline.c
index 01db13f..364010b 100644
--- a/arrow/cdata/trampoline.c
+++ b/arrow/cdata/trampoline.c
@@ -20,6 +20,7 @@
int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
+int asyncTaskExtract(struct ArrowAsyncTask*, struct ArrowDeviceArray*);
int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct
ArrowSchema* out) {
// XXX(https://github.com/apache/arrow-adbc/issues/729)
@@ -32,3 +33,8 @@ int streamGetNextTrampoline(struct ArrowArrayStream* stream,
struct ArrowArray*
memset(out, 0, sizeof(*out));
return streamGetNext(stream, out);
}
+
+int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct
ArrowDeviceArray* out) {
+ memset(out, 0, sizeof(*out));
+ return asyncTaskExtract(task, out);
+}