This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new d10a859  feat(arrow/cdata): Add Implementation of Async C Data 
interface (#169)
d10a859 is described below

commit d10a8591351fd113fe7ae90c9b56e80ebdfd196d
Author: Matt Topol <[email protected]>
AuthorDate: Tue Nov 12 05:48:17 2024 +0100

    feat(arrow/cdata): Add Implementation of Async C Data interface (#169)
    
    This adds a basic implementation of helpers for managing an
    ArrowAsyncDeviceStreamHandler for using the Async Arrow C Device
    interface. The corresponding C++ helper implementation can be found at
    https://github.com/apache/arrow/pull/44495 with the discusson on the
    actual C structures located at
    https://github.com/apache/arrow/pull/43632.
    
    ---------
    
    Co-authored-by: Sutou Kouhei <[email protected]>
---
 arrow/cdata/arrow/c/abi.h           | 341 +++++++++++++++++++++++++++++++++++-
 arrow/cdata/arrow/c/helpers.h       |  91 ++++++++--
 arrow/cdata/cdata.go                |   5 +
 arrow/cdata/cdata_exports.go        |  80 +++++++++
 arrow/cdata/cdata_test.go           | 108 ++++++++++++
 arrow/cdata/cdata_test_framework.go |  15 ++
 arrow/cdata/exports.go              | 262 +++++++++++++++++++++++++++
 arrow/cdata/interface.go            |  60 +++++++
 arrow/cdata/trampoline.c            |   6 +
 9 files changed, 948 insertions(+), 20 deletions(-)

diff --git a/arrow/cdata/arrow/c/abi.h b/arrow/cdata/arrow/c/abi.h
index d58417e..7f4d565 100644
--- a/arrow/cdata/arrow/c/abi.h
+++ b/arrow/cdata/arrow/c/abi.h
@@ -15,20 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
+/// \file abi.h Arrow C Data Interface
+///
+/// The Arrow C Data interface defines a very small, stable set
+/// of C definitions which can be easily copied into any project's
+/// source code and vendored to be used for columnar data interchange
+/// in the Arrow format. For non-C/C++ languages and runtimes,
+/// it should be almost as easy to translate the C definitions into
+/// the corresponding C FFI declarations.
+///
+/// Applications and libraries can therefore work with Arrow memory
+/// without necessarily using the Arrow libraries or reinventing
+/// the wheel. Developers can choose between tight integration
+/// with the Arrow software project or minimal integration with
+/// the Arrow format only.
+
 #pragma once
 
 #include <stdint.h>
 
+// Spec and documentation: 
https://arrow.apache.org/docs/format/CDataInterface.html
+
 #ifdef __cplusplus
 extern "C" {
 #endif
 
 #ifndef ARROW_C_DATA_INTERFACE
-#define ARROW_C_DATA_INTERFACE
+#  define ARROW_C_DATA_INTERFACE
 
-#define ARROW_FLAG_DICTIONARY_ORDERED 1
-#define ARROW_FLAG_NULLABLE 2
-#define ARROW_FLAG_MAP_KEYS_SORTED 4
+#  define ARROW_FLAG_DICTIONARY_ORDERED 1
+#  define ARROW_FLAG_NULLABLE 2
+#  define ARROW_FLAG_MAP_KEYS_SORTED 4
 
 struct ArrowSchema {
   // Array type description
@@ -65,8 +82,63 @@ struct ArrowArray {
 
 #endif  // ARROW_C_DATA_INTERFACE
 
+#ifndef ARROW_C_DEVICE_DATA_INTERFACE
+#  define ARROW_C_DEVICE_DATA_INTERFACE
+
+// Spec and Documentation: 
https://arrow.apache.org/docs/format/CDeviceDataInterface.html
+
+// DeviceType for the allocated memory
+typedef int32_t ArrowDeviceType;
+
+// CPU device, same as using ArrowArray directly
+#  define ARROW_DEVICE_CPU 1
+// CUDA GPU Device
+#  define ARROW_DEVICE_CUDA 2
+// Pinned CUDA CPU memory by cudaMallocHost
+#  define ARROW_DEVICE_CUDA_HOST 3
+// OpenCL Device
+#  define ARROW_DEVICE_OPENCL 4
+// Vulkan buffer for next-gen graphics
+#  define ARROW_DEVICE_VULKAN 7
+// Metal for Apple GPU
+#  define ARROW_DEVICE_METAL 8
+// Verilog simulator buffer
+#  define ARROW_DEVICE_VPI 9
+// ROCm GPUs for AMD GPUs
+#  define ARROW_DEVICE_ROCM 10
+// Pinned ROCm CPU memory allocated by hipMallocHost
+#  define ARROW_DEVICE_ROCM_HOST 11
+// Reserved for extension
+#  define ARROW_DEVICE_EXT_DEV 12
+// CUDA managed/unified memory allocated by cudaMallocManaged
+#  define ARROW_DEVICE_CUDA_MANAGED 13
+// unified shared memory allocated on a oneAPI non-partitioned device.
+#  define ARROW_DEVICE_ONEAPI 14
+// GPU support for next-gen WebGPU standard
+#  define ARROW_DEVICE_WEBGPU 15
+// Qualcomm Hexagon DSP
+#  define ARROW_DEVICE_HEXAGON 16
+
+struct ArrowDeviceArray {
+  // the Allocated Array
+  //
+  // the buffers in the array (along with the buffers of any
+  // children) are what is allocated on the device.
+  struct ArrowArray array;
+  // The device id to identify a specific device
+  int64_t device_id;
+  // The type of device which can access this memory.
+  ArrowDeviceType device_type;
+  // An event-like object to synchronize on if needed.
+  void* sync_event;
+  // Reserved bytes for future expansion.
+  int64_t reserved[3];
+};
+
+#endif  // ARROW_C_DEVICE_DATA_INTERFACE
+
 #ifndef ARROW_C_STREAM_INTERFACE
-#define ARROW_C_STREAM_INTERFACE
+#  define ARROW_C_STREAM_INTERFACE
 
 struct ArrowArrayStream {
   // Callback to get the stream type
@@ -106,6 +178,265 @@ struct ArrowArrayStream {
 
 #endif  // ARROW_C_STREAM_INTERFACE
 
+#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
+#  define ARROW_C_DEVICE_STREAM_INTERFACE
+
+// Equivalent to ArrowArrayStream, but for ArrowDeviceArrays.
+//
+// This stream is intended to provide a stream of data on a single
+// device, if a producer wants data to be produced on multiple devices
+// then multiple streams should be provided. One per device.
+struct ArrowDeviceArrayStream {
+  // The device that this stream produces data on.
+  ArrowDeviceType device_type;
+
+  // Callback to get the stream schema
+  // (will be the same for all arrays in the stream).
+  //
+  // Return value 0 if successful, an `errno`-compatible error code otherwise.
+  //
+  // If successful, the ArrowSchema must be released independently from the 
stream.
+  // The schema should be accessible via CPU memory.
+  int (*get_schema)(struct ArrowDeviceArrayStream* self, struct ArrowSchema* 
out);
+
+  // Callback to get the next array
+  // (if no error and the array is released, the stream has ended)
+  //
+  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+  //
+  // If successful, the ArrowDeviceArray must be released independently from 
the stream.
+  int (*get_next)(struct ArrowDeviceArrayStream* self, struct 
ArrowDeviceArray* out);
+
+  // Callback to get optional detailed error information.
+  // This must only be called if the last stream operation failed
+  // with a non-0 return code.
+  //
+  // Return value: pointer to a null-terminated character array describing
+  // the last error, or NULL if no description is available.
+  //
+  // The returned pointer is only valid until the next operation on this stream
+  // (including release).
+  const char* (*get_last_error)(struct ArrowDeviceArrayStream* self);
+
+  // Release callback: release the stream's own resources.
+  // Note that arrays returned by `get_next` must be individually released.
+  void (*release)(struct ArrowDeviceArrayStream* self);
+
+  // Opaque producer-specific data
+  void* private_data;
+};
+
+#endif  // ARROW_C_DEVICE_STREAM_INTERFACE
+
+#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
+#  define ARROW_C_ASYNC_STREAM_INTERFACE
+
+// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that 
was passed
+// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
+//
+// The reason for this Task approach instead of the Async interface returning
+// the Array directly is to allow for more complex thread handling and reducing
+// context switching and data transfers between CPU cores (e.g. from one L1/L2
+// cache to another) if desired.
+//
+// For example, the `on_next_task` callback can be called when data is ready, 
while
+// the producer puts potential "decoding" logic in the `ArrowAsyncTask` 
object. This
+// allows for the producer to manage the I/O on one thread which calls 
`on_next_task`
+// and the consumer can determine when the decoding (producer logic in the 
`extract_data`
+// callback of the task) occurs and on which thread, to avoid a CPU core 
transfer
+// (data staying in the L2 cache).
+struct ArrowAsyncTask {
+  // This callback should populate the ArrowDeviceArray associated with this 
task.
+  // The order of ArrowAsyncTasks provided by the producer enables a consumer 
to
+  // ensure the order of data to process.
+  //
+  // This function is expected to be synchronous, but should not perform any 
blocking
+  // I/O. Ideally it should be as cheap as possible so as to not tie up the 
consumer
+  // thread unnecessarily.
+  //
+  // Returns: 0 if successful, errno-compatible error otherwise.
+  //
+  // If a non-0 value is returned then it should be followed by a call to 
`on_error`
+  // on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's 
highly
+  // likely that whatever is calling this function may be entirely 
disconnected from
+  // the current control flow. Indicating an error here with a non-zero return 
allows
+  // the current flow to be aware of the error occurring, while still allowing 
any
+  // logging or error handling to still be centralized in the `on_error` 
callback of
+  // the original Async handler.
+  //
+  // Rather than a release callback, any required cleanup should be performed 
as part
+  // of the invocation of `extract_data`. Ownership of the Array is passed to 
the consumer
+  // calling this, and so it must be released separately.
+  //
+  // It is only valid to call this method exactly once.
+  int (*extract_data)(struct ArrowAsyncTask* self, struct ArrowDeviceArray* 
out);
+
+  // opaque task-specific data
+  void* private_data;
+};
+
+// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between 
an async
+// producer and consumer. This object allows the consumer to perform 
backpressure and flow
+// control on the asynchronous stream processing. This object must be owned by 
the
+// producer who creates it, and thus is responsible for cleaning it up.
+struct ArrowAsyncProducer {
+  // The device type that this stream produces data on.
+  ArrowDeviceType device_type;
+
+  // A consumer must call this function to start receiving on_next_task calls.
+  //
+  // It *must* be valid to call this synchronously from within `on_next_task` 
or
+  // `on_schema`, but this function *must not* immediately call `on_next_task` 
so as
+  // to avoid recursion and reentrant callbacks.
+  //
+  // After cancel has been called, additional calls to this function must be 
NOPs,
+  // but allowed. While not cancelled, calling this function must register the
+  // given number of additional arrays/batches to be produced with the 
producer.
+  // The producer should only call `on_next_task` at most the registered number
+  // of arrays before propagating backpressure.
+  //
+  // Any error encountered by calling request must be propagated by calling 
the `on_error`
+  // callback of the ArrowAsyncDeviceStreamHandler.
+  //
+  // While not cancelled, any subsequent calls to `on_next_task`, `on_error` or
+  // `release` should be scheduled by the producer to be called later.
+  //
+  // It is invalid for a consumer to call this with a value of n <= 0, 
producers should
+  // error if given such a value.
+  void (*request)(struct ArrowAsyncProducer* self, int64_t n);
+
+  // This cancel callback signals a producer that it must eventually stop 
making calls
+  // to on_next_task. It must be idempotent and thread-safe. After calling 
cancel once,
+  // subsequent calls must be NOPs. This must not call any consumer-side 
handlers other
+  // than `on_error`.
+  //
+  // It is not required that calling cancel affect the producer immediately, 
only that it
+  // must eventually stop calling on_next_task and subsequently call release 
on the
+  // async handler. As such, a consumer must be prepared to receive one or 
more calls to
+  // `on_next_task` even after calling cancel if there are still requested 
arrays pending.
+  //
+  // Successful cancellation should *not* result in the producer calling 
`on_error`, it
+  // should finish out any remaining tasks and eventually call `release`.
+  //
+  // Any error encountered during handling a call to cancel must be reported 
via the
+  // on_error callback on the async stream handler.
+  void (*cancel)(struct ArrowAsyncProducer* self);
+
+  // Any additional metadata tied to a specific stream of data. This must 
either be NULL
+  // or a valid pointer to metadata which is encoded in the same way schema 
metadata
+  // would be. Non-null metadata must be valid for the lifetime of this 
object. As an
+  // example a producer could use this to provide the total number of rows 
and/or batches
+  // in the stream if known.
+  const char* additional_metadata;
+
+  // producer-specific opaque data.
+  void* private_data;
+};
+
+// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an 
asynchronous
+// style of interaction. While ArrowDeviceArrayStream provides producer
+// defined callbacks, this is intended to be created by the consumer instead.
+// The consumer passes this handler to the producer, which in turn uses the
+// callbacks to inform the consumer of events in the stream.
+struct ArrowAsyncDeviceStreamHandler {
+  // Handler for receiving a schema. The passed in stream_schema must be
+  // released or moved by the handler (producer is giving ownership of the 
schema to
+  // the handler, but not ownership of the top level object itself).
+  //
+  // With the exception of an error occurring (on_error), this must be the 
first
+  // callback function which is called by a producer and must only be called 
exactly
+  // once. As such, the producer should provide a valid ArrowAsyncProducer 
instance
+  // so the consumer can control the flow. See the documentation on 
ArrowAsyncProducer
+  // for how it works. The ArrowAsyncProducer is owned by the producer who 
calls this
+  // function and thus the producer is responsible for cleaning it up when 
calling
+  // the release callback of this handler.
+  //
+  // If there is any additional metadata tied to this stream, it will be 
provided as
+  // a non-null value for the `additional_metadata` field of the 
ArrowAsyncProducer
+  // which will be valid at least until the release callback is called.
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise
+  //
+  // A producer that receives a non-zero return here should stop producing and 
eventually
+  // call release instead.
+  int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
+                   struct ArrowSchema* stream_schema);
+
+  // Handler for receiving data. This is called when data is available 
providing an
+  // ArrowAsyncTask struct to signify it. The producer indicates the end of 
the stream
+  // by passing NULL as the value for the task rather than a valid pointer to 
a task.
+  // The task object is only valid for the lifetime of this function call, if 
a consumer
+  // wants to utilize it after this function returns, it must copy or move the 
contents
+  // of it to a new ArrowAsyncTask object.
+  //
+  // The `request` callback of a provided ArrowAsyncProducer must be called in 
order
+  // to start receiving calls to this handler.
+  //
+  // The metadata argument can be null or can be used by a producer
+  // to pass arbitrary extra information to the consumer (such as total number
+  // of rows, context info, or otherwise). The data should be passed using the 
same
+  // encoding as the metadata within the ArrowSchema struct itself (defined in
+  // the spec at
+  // 
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata)
+  //
+  // If metadata is non-null then it only needs to exist for the lifetime of 
this call,
+  // a consumer who wants it to live after that must copy it to ensure 
lifetime.
+  //
+  // A producer *must not* call this concurrently from multiple different 
threads.
+  //
+  // A consumer must be prepared to receive one or more calls to this callback 
even
+  // after calling cancel on the corresponding ArrowAsyncProducer, as cancel 
does not
+  // guarantee it happens immediately.
+  //
+  // Return value: 0 if successful, `errno`-compatible error otherwise.
+  //
+  // If the consumer returns a non-zero return from this method, that 
indicates to the
+  // producer that it should stop propagating data as an error occurred. After 
receiving
+  // such a return, the only interaction with this object is for the producer 
to call
+  // the `release` callback.
+  int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
+                      struct ArrowAsyncTask* task, const char* metadata);
+
+  // Handler for encountering an error. The producer should call release after
+  // this returns to clean up any resources. The `code` passed in can be any 
error
+  // code that a producer wants, but should be errno-compatible for 
consistency.
+  //
+  // If the message or metadata are non-null, they will only last as long as 
this
+  // function call. The consumer would need to perform a copy of the data if 
it is
+  // necessary for them to live past the lifetime of this call.
+  //
+  // Error metadata should be encoded as with metadata in ArrowSchema, defined 
in
+  // the spec at
+  // 
https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata
+  //
+  // It is valid for this to be called by a producer with or without a 
preceding call
+  // to ArrowAsyncProducer.request.
+  //
+  // This callback must not call any methods of an ArrowAsyncProducer object.
+  void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code,
+                   const char* message, const char* metadata);
+
+  // Release callback to release any resources for the handler. Should always 
be
+  // called by a producer when it is done utilizing a handler. No callbacks 
should
+  // be called after this is called.
+  //
+  // It is valid for the release callback to be called by a producer with or 
without
+  // a preceding call to ArrowAsyncProducer.request.
+  //
+  // The release callback must not call any methods of an ArrowAsyncProducer 
object.
+  void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
+
+  // MUST be populated by the producer BEFORE calling any callbacks other than 
release.
+  // This provides the connection between a handler and its producer, and must 
exist until
+  // the release callback is called.
+  struct ArrowAsyncProducer* producer;
+
+  // Opaque handler-specific data
+  void* private_data;
+};
+
+#endif  // ARROW_C_ASYNC_STREAM_INTERFACE
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/arrow/cdata/arrow/c/helpers.h b/arrow/cdata/arrow/c/helpers.h
index 6581403..6e4df17 100644
--- a/arrow/cdata/arrow/c/helpers.h
+++ b/arrow/cdata/arrow/c/helpers.h
@@ -18,21 +18,31 @@
 #pragma once
 
 #include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
 #include <string.h>
 
 #include "arrow/c/abi.h"
 
+#define ARROW_C_ASSERT(condition, msg)                          \
+  do {                                                          \
+    if (!(condition)) {                                         \
+      fprintf(stderr, "%s:%d:: %s", __FILE__, __LINE__, (msg)); \
+      abort();                                                  \
+    }                                                           \
+  } while (0)
+
 #ifdef __cplusplus
 extern "C" {
 #endif
 
 /// Query whether the C schema is released
-static inline int ArrowSchemaIsReleased(const struct ArrowSchema* schema) {
+inline int ArrowSchemaIsReleased(const struct ArrowSchema* schema) {
   return schema->release == NULL;
 }
 
 /// Mark the C schema released (for use in release callbacks)
-static inline void ArrowSchemaMarkReleased(struct ArrowSchema* schema) {
+inline void ArrowSchemaMarkReleased(struct ArrowSchema* schema) {
   schema->release = NULL;
 }
 
@@ -40,7 +50,7 @@ static inline void ArrowSchemaMarkReleased(struct 
ArrowSchema* schema) {
 ///
 /// Note `dest` must *not* point to a valid schema already, otherwise there
 /// will be a memory leak.
-static inline void ArrowSchemaMove(struct ArrowSchema* src, struct 
ArrowSchema* dest) {
+inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dest) 
{
   assert(dest != src);
   assert(!ArrowSchemaIsReleased(src));
   memcpy(dest, src, sizeof(struct ArrowSchema));
@@ -48,47 +58,81 @@ static inline void ArrowSchemaMove(struct ArrowSchema* src, 
struct ArrowSchema*
 }
 
 /// Release the C schema, if necessary, by calling its release callback
-static inline void ArrowSchemaRelease(struct ArrowSchema* schema) {
+inline void ArrowSchemaRelease(struct ArrowSchema* schema) {
   if (!ArrowSchemaIsReleased(schema)) {
     schema->release(schema);
-    assert(ArrowSchemaIsReleased(schema));
+    ARROW_C_ASSERT(ArrowSchemaIsReleased(schema),
+                   "ArrowSchemaRelease did not cleanup release callback");
   }
 }
 
 /// Query whether the C array is released
-static inline int ArrowArrayIsReleased(const struct ArrowArray* array) {
+inline int ArrowArrayIsReleased(const struct ArrowArray* array) {
   return array->release == NULL;
 }
 
+inline int ArrowDeviceArrayIsReleased(const struct ArrowDeviceArray* array) {
+  return ArrowArrayIsReleased(&array->array);
+}
+
 /// Mark the C array released (for use in release callbacks)
-static inline void ArrowArrayMarkReleased(struct ArrowArray* array) { 
array->release = NULL; }
+inline void ArrowArrayMarkReleased(struct ArrowArray* array) { array->release 
= NULL; }
+
+inline void ArrowDeviceArrayMarkReleased(struct ArrowDeviceArray* array) {
+  ArrowArrayMarkReleased(&array->array);
+}
 
 /// Move the C array from `src` to `dest`
 ///
 /// Note `dest` must *not* point to a valid array already, otherwise there
 /// will be a memory leak.
-static inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray* 
dest) {
+inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray* dest) {
   assert(dest != src);
   assert(!ArrowArrayIsReleased(src));
   memcpy(dest, src, sizeof(struct ArrowArray));
   ArrowArrayMarkReleased(src);
 }
 
+inline void ArrowDeviceArrayMove(struct ArrowDeviceArray* src,
+                                 struct ArrowDeviceArray* dest) {
+  assert(dest != src);
+  assert(!ArrowDeviceArrayIsReleased(src));
+  memcpy(dest, src, sizeof(struct ArrowDeviceArray));
+  ArrowDeviceArrayMarkReleased(src);
+}
+
 /// Release the C array, if necessary, by calling its release callback
-static inline void ArrowArrayRelease(struct ArrowArray* array) {
+inline void ArrowArrayRelease(struct ArrowArray* array) {
   if (!ArrowArrayIsReleased(array)) {
     array->release(array);
-    assert(ArrowArrayIsReleased(array));
+    ARROW_C_ASSERT(ArrowArrayIsReleased(array),
+                   "ArrowArrayRelease did not cleanup release callback");
+  }
+}
+
+inline void ArrowDeviceArrayRelease(struct ArrowDeviceArray* array) {
+  if (!ArrowDeviceArrayIsReleased(array)) {
+    array->array.release(&array->array);
+    ARROW_C_ASSERT(ArrowDeviceArrayIsReleased(array),
+                   "ArrowDeviceArrayRelease did not cleanup release callback");
   }
 }
 
 /// Query whether the C array stream is released
-static inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* 
stream) {
+inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* stream) {
+  return stream->release == NULL;
+}
+
+inline int ArrowDeviceArrayStreamIsReleased(const struct 
ArrowDeviceArrayStream* stream) {
   return stream->release == NULL;
 }
 
 /// Mark the C array stream released (for use in release callbacks)
-static inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* 
stream) {
+inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) {
+  stream->release = NULL;
+}
+
+inline void ArrowDeviceArrayStreamMarkReleased(struct ArrowDeviceArrayStream* 
stream) {
   stream->release = NULL;
 }
 
@@ -96,7 +140,7 @@ static inline void ArrowArrayStreamMarkReleased(struct 
ArrowArrayStream* stream)
 ///
 /// Note `dest` must *not* point to a valid stream already, otherwise there
 /// will be a memory leak.
-static inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
+inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
                                  struct ArrowArrayStream* dest) {
   assert(dest != src);
   assert(!ArrowArrayStreamIsReleased(src));
@@ -104,11 +148,28 @@ static inline void ArrowArrayStreamMove(struct 
ArrowArrayStream* src,
   ArrowArrayStreamMarkReleased(src);
 }
 
+inline void ArrowDeviceArrayStreamMove(struct ArrowDeviceArrayStream* src,
+                                       struct ArrowDeviceArrayStream* dest) {
+  assert(dest != src);
+  assert(!ArrowDeviceArrayStreamIsReleased(src));
+  memcpy(dest, src, sizeof(struct ArrowDeviceArrayStream));
+  ArrowDeviceArrayStreamMarkReleased(src);
+}
+
 /// Release the C array stream, if necessary, by calling its release callback
-static inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
+inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
   if (!ArrowArrayStreamIsReleased(stream)) {
     stream->release(stream);
-    assert(ArrowArrayStreamIsReleased(stream));
+    ARROW_C_ASSERT(ArrowArrayStreamIsReleased(stream),
+                   "ArrowArrayStreamRelease did not cleanup release callback");
+  }
+}
+
+inline void ArrowDeviceArrayStreamRelease(struct ArrowDeviceArrayStream* 
stream) {
+  if (!ArrowDeviceArrayStreamIsReleased(stream)) {
+    stream->release(stream);
+    ARROW_C_ASSERT(ArrowDeviceArrayStreamIsReleased(stream),
+                   "ArrowDeviceArrayStreamRelease did not cleanup release 
callback");
   }
 }
 
diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go
index d5748a3..4361e6f 100644
--- a/arrow/cdata/cdata.go
+++ b/arrow/cdata/cdata.go
@@ -64,6 +64,11 @@ type (
        CArrowArray = C.struct_ArrowArray
        // CArrowArrayStream is the C Stream Interface object for handling 
streams of record batches.
        CArrowArrayStream = C.struct_ArrowArrayStream
+
+       CArrowAsyncDeviceStreamHandler = C.struct_ArrowAsyncDeviceStreamHandler
+       CArrowAsyncProducer            = C.struct_ArrowAsyncProducer
+       CArrowAsyncTask                = C.struct_ArrowAsyncTask
+       CArrowDeviceArray              = C.struct_ArrowDeviceArray
 )
 
 // Map from the defined strings to their corresponding arrow.DataType interface
diff --git a/arrow/cdata/cdata_exports.go b/arrow/cdata/cdata_exports.go
index 6d1e038..da7db6c 100644
--- a/arrow/cdata/cdata_exports.go
+++ b/arrow/cdata/cdata_exports.go
@@ -33,10 +33,23 @@ package cdata
 // void goReleaseSchema(struct ArrowSchema* schema) {
 //      releaseExportedSchema(schema);
 // }
+//
+// void goCallCancel(struct ArrowAsyncProducer* producer) {
+//     producer->cancel(producer);
+// }
+//
+// int goExtractTaskData(struct ArrowAsyncTask* task, struct ArrowDeviceArray* 
out) {
+//     return task->extract_data(task, out);
+// }
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);
+// }
 import "C"
 
 import (
        "bytes"
+       "context"
        "encoding/binary"
        "fmt"
        "runtime/cgo"
@@ -489,3 +502,70 @@ func (rr cRecordReader) release() {
        }
        rr.rdr.Release()
 }
+
+type cAsyncStreamHandler struct {
+       producer  *CArrowAsyncProducer
+       taskQueue chan taskState
+       ctx       context.Context
+}
+
+func asyncTaskQueue(ctx context.Context, schema *arrow.Schema, recordStream 
chan<- RecordMessage, taskQueue <-chan taskState, producer 
*CArrowAsyncProducer) {
+       defer close(recordStream)
+       for {
+               select {
+               case <-ctx.Done():
+                       C.goCallCancel(producer)
+                       return
+               case task, ok := <-taskQueue:
+                       // if the queue closes or we receive a nil task, we're 
done
+                       if !ok || (task.err == nil && task.task.extract_data == 
nil) {
+                               return
+                       }
+
+                       if task.err != nil {
+                               recordStream <- RecordMessage{Err: task.err}
+                               continue
+                       }
+
+                       // request another batch now that we've processed this 
one
+                       C.goCallRequest(producer, C.int64_t(1))
+
+                       var out CArrowDeviceArray
+                       if C.goExtractTaskData(&task.task, &out) != C.int(0) {
+                               continue
+                       }
+
+                       rec, err := ImportCRecordBatchWithSchema(&out.array, 
schema)
+                       if err != nil {
+                               recordStream <- RecordMessage{Err: err}
+                       } else {
+                               recordStream <- RecordMessage{Record: rec, 
AdditionalMetadata: task.meta}
+                       }
+               }
+       }
+}
+
+func (h *cAsyncStreamHandler) onNextTask(task *CArrowAsyncTask, metadata 
*C.char) C.int {
+       if task == nil {
+               h.taskQueue <- taskState{}
+               return 0
+       }
+
+       ts := taskState{task: *task}
+       if metadata != nil {
+               ts.meta = decodeCMetadata(metadata)
+       }
+       h.taskQueue <- ts
+       return 0
+}
+
+func (h *cAsyncStreamHandler) onError(code C.int, message, metadata *C.char) {
+       h.taskQueue <- taskState{err: AsyncStreamError{
+               Code: int(code), Msg: C.GoString(message), Metadata: 
C.GoString(metadata)}}
+}
+
+func (h *cAsyncStreamHandler) release() {
+       close(h.taskQueue)
+       h.taskQueue, h.producer = nil, nil
+       h.producer = nil
+}
diff --git a/arrow/cdata/cdata_test.go b/arrow/cdata/cdata_test.go
index ebd4fcf..bd1942f 100644
--- a/arrow/cdata/cdata_test.go
+++ b/arrow/cdata/cdata_test.go
@@ -24,6 +24,7 @@
 package cdata
 
 import (
+       "context"
        "encoding/json"
        "errors"
        "fmt"
@@ -42,6 +43,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/arrow-go/v18/arrow/memory/mallocator"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
 func TestSchemaExport(t *testing.T) {
@@ -1050,3 +1052,109 @@ func TestConfuseGoGc(t *testing.T) {
 
        wg.Wait()
 }
+
+func TestAsyncInterfacesSimple(t *testing.T) {
+       reclist := arrdata.Records["primitives"]
+
+       handler := testAsyncHandler()
+       defer freeAsyncHandler(handler)
+
+       ctx := context.Background()
+       ch := CreateAsyncDeviceStreamHandler(ctx, 1, handler)
+
+       stream := make(chan RecordMessage, len(reclist))
+       go func() {
+               defer close(stream)
+               for _, r := range reclist {
+                       r.Retain()
+                       stream <- RecordMessage{Record: r}
+               }
+       }()
+
+       wait := make(chan struct{})
+       go func() {
+               defer close(wait)
+               assert.NoError(t, 
ExportAsyncRecordBatchStream(reclist[0].Schema(), stream, handler))
+       }()
+
+       asyncStream := <-ch
+       require.NoError(t, asyncStream.Err)
+
+       assert.True(t, reclist[0].Schema().Equal(asyncStream.Schema))
+       var idx int
+       for r := range asyncStream.Stream {
+               require.NoError(t, r.Err)
+               assert.True(t, array.RecordEqual(reclist[idx], r.Record))
+               idx++
+               r.Record.Release()
+       }
+
+       <-wait
+}
+
+func TestAsyncSchemaError(t *testing.T) {
+       handler := testAsyncHandler()
+       defer freeAsyncHandler(handler)
+
+       ctx := context.Background()
+       ch := CreateAsyncDeviceStreamHandler(ctx, 1, handler)
+
+       wait := make(chan struct{})
+       go func() {
+               defer close(wait)
+               err := ExportAsyncRecordBatchStream(nil, nil, handler)
+               assert.ErrorIs(t, err, arrow.ErrInvalid)
+               assert.ErrorContains(t, err, "must have non-nil schema")
+       }()
+
+       asyncStream := <-ch
+       require.Error(t, asyncStream.Err)
+
+       var asyncErr AsyncStreamError
+       assert.ErrorAs(t, asyncStream.Err, &asyncErr)
+       assert.Equal(t, 22, int(asyncErr.Code))
+
+       <-wait
+}
+
+func TestAsyncPropagateError(t *testing.T) {
+       reclist := arrdata.Records["primitives"]
+
+       handler := testAsyncHandler()
+       defer freeAsyncHandler(handler)
+
+       ctx := context.Background()
+       ch := CreateAsyncDeviceStreamHandler(ctx, 1, handler)
+
+       stream := make(chan RecordMessage, 2)
+       go func() {
+               defer close(stream)
+               reclist[0].Retain()
+               stream <- RecordMessage{Record: reclist[0]}
+               stream <- RecordMessage{Err: assert.AnError}
+       }()
+
+       wait := make(chan struct{})
+       go func() {
+               defer close(wait)
+               err := ExportAsyncRecordBatchStream(reclist[0].Schema(), 
stream, handler)
+               assert.ErrorIs(t, err, assert.AnError)
+       }()
+
+       asyncStream := <-ch
+       require.NoError(t, asyncStream.Err)
+
+       assert.True(t, reclist[0].Schema().Equal(asyncStream.Schema))
+       rec1 := <-asyncStream.Stream
+       require.NoError(t, rec1.Err)
+       assert.True(t, array.RecordEqual(reclist[0], rec1.Record))
+       rec1.Record.Release()
+
+       rec2 := <-asyncStream.Stream
+       var err AsyncStreamError
+       assert.ErrorContains(t, rec2.Err, assert.AnError.Error())
+       assert.ErrorAs(t, rec2.Err, &err)
+       assert.Equal(t, 22, int(err.Code))
+
+       <-wait
+}
diff --git a/arrow/cdata/cdata_test_framework.go 
b/arrow/cdata/cdata_test_framework.go
index 331e80b..ad6b911 100644
--- a/arrow/cdata/cdata_test_framework.go
+++ b/arrow/cdata/cdata_test_framework.go
@@ -37,6 +37,13 @@ package cdata
 //     return out;
 // }
 //
+// static struct ArrowAsyncDeviceStreamHandler* get_test_async_handler() {
+//     struct ArrowAsyncDeviceStreamHandler* handler =
+//             (struct ArrowAsyncDeviceStreamHandler*)malloc(sizeof(struct 
ArrowAsyncDeviceStreamHandler));
+//     memset(handler, 0, sizeof(*handler));
+//     return handler;
+// }
+//
 // void release_test_arr(struct ArrowArray* arr);
 //
 // static int32_t* get_data() {
@@ -132,6 +139,14 @@ func freeMallocedSchemas(schemas **CArrowSchema) {
        C.free_malloced_schemas(schemas)
 }
 
+func testAsyncHandler() *CArrowAsyncDeviceStreamHandler {
+       return C.get_test_async_handler()
+}
+
+func freeAsyncHandler(h *CArrowAsyncDeviceStreamHandler) {
+       C.free(unsafe.Pointer(h))
+}
+
 func testNested(fmts, names []string, isnull []bool) **CArrowSchema {
        if len(fmts) != len(names) {
                panic("testing nested lists must have same size fmts and names")
diff --git a/arrow/cdata/exports.go b/arrow/cdata/exports.go
index bf64ea7..d70305c 100644
--- a/arrow/cdata/exports.go
+++ b/arrow/cdata/exports.go
@@ -17,6 +17,8 @@
 package cdata
 
 import (
+       "context"
+       "fmt"
        "runtime/cgo"
        "unsafe"
 
@@ -25,6 +27,8 @@ import (
 )
 
 // #include <stdlib.h>
+// #include <errno.h>
+// #include "arrow/c/abi.h"
 // #include "arrow/c/helpers.h"
 //
 // typedef const char cchar_t;
@@ -32,9 +36,40 @@ import (
 // extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
 // extern const char* streamGetError(struct ArrowArrayStream*);
 // extern void streamRelease(struct ArrowArrayStream*);
+// extern int asyncStreamOnSchema(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowSchema*);
+// extern int asyncStreamOnNextTask(struct ArrowAsyncDeviceStreamHandler*, 
struct ArrowAsyncTask*, char*);
+// extern void asyncStreamOnError(struct ArrowAsyncDeviceStreamHandler*, int, 
char*, char*);
+// extern void asyncStreamRelease(struct ArrowAsyncDeviceStreamHandler*);
+// extern void asyncProducerRequest(struct ArrowAsyncProducer*, int64_t);
+// extern void asyncProducerCancel(struct ArrowAsyncProducer*);
+// extern int asyncTaskExtract(struct ArrowAsyncTask*, struct 
ArrowDeviceArray*);
 // // XXX(https://github.com/apache/arrow-adbc/issues/729)
 // int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct 
ArrowSchema* out);
 // int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct 
ArrowArray* out);
+// int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct 
ArrowDeviceArray* out);
+//
+// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
+//     producer->request(producer, n);
+// }
+// static int goCallOnSchema(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowSchema* schema) {
+//     return handler->on_schema(handler, schema);
+// }
+// static void goCallOnError(struct ArrowAsyncDeviceStreamHandler* handler, 
int code, char* message, char* metadata) {
+//     handler->on_error(handler, code, message, metadata);
+// }
+// static int goCallOnNextTask(struct ArrowAsyncDeviceStreamHandler* handler, 
struct ArrowAsyncTask* task, char* metadata) {
+//     return handler->on_next_task(handler, task, metadata);
+// }
+//
+// static struct ArrowAsyncProducer* get_producer() {
+//     struct ArrowAsyncProducer* out = (struct 
ArrowAsyncProducer*)malloc(sizeof(struct ArrowAsyncProducer));
+//     memset(out, 0, sizeof(struct ArrowAsyncProducer));
+//     return out;
+// }
+//
+// static void goReleaseAsyncHandler(struct ArrowAsyncDeviceStreamHandler* 
handler) {
+//     handler->release(handler);
+// }
 //
 import "C"
 
@@ -155,3 +190,230 @@ func exportStream(rdr array.RecordReader, out 
*CArrowArrayStream) {
        h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
        out.private_data = createHandle(h)
 }
+
+type cAsyncState struct {
+       ch        chan AsyncRecordBatchStream
+       queueSize uint64
+       ctx       context.Context
+}
+
+type taskState struct {
+       task CArrowAsyncTask
+       meta arrow.Metadata
+       err  error
+}
+
+//export asyncStreamOnSchema
+func asyncStreamOnSchema(self *CArrowAsyncDeviceStreamHandler, schema 
*CArrowSchema) C.int {
+       h := getHandle(self.private_data)
+       handler := h.Value().(cAsyncState)
+       defer close(handler.ch)
+
+       if self.producer.device_type != C.ARROW_DEVICE_CPU {
+               handler.ch <- AsyncRecordBatchStream{Err: 
fmt.Errorf("unsupported device type")}
+               return C.EINVAL
+       }
+
+       sc, err := ImportCArrowSchema(schema)
+       if err != nil {
+               handler.ch <- AsyncRecordBatchStream{Err: err}
+               return C.EINVAL
+       }
+
+       var meta arrow.Metadata
+       if self.producer.additional_metadata != nil {
+               meta = decodeCMetadata(self.producer.additional_metadata)
+       }
+
+       recordStream := make(chan RecordMessage, handler.queueSize)
+       taskQueue := make(chan taskState, handler.queueSize)
+       handler.ch <- AsyncRecordBatchStream{Schema: sc,
+               AdditionalMetadata: meta, Stream: recordStream}
+
+       self.private_data = createHandle(cgo.NewHandle(&cAsyncStreamHandler{
+               producer:  self.producer,
+               ctx:       handler.ctx,
+               taskQueue: taskQueue,
+       }))
+       defer h.Delete()
+
+       C.goCallRequest(self.producer, C.int64_t(handler.queueSize))
+       go asyncTaskQueue(handler.ctx, sc, recordStream, taskQueue, 
self.producer)
+       return 0
+}
+
+//export asyncStreamOnNextTask
+func asyncStreamOnNextTask(self *CArrowAsyncDeviceStreamHandler, task 
*CArrowAsyncTask, metadata *C.char) C.int {
+       h := getHandle(self.private_data)
+       handler := h.Value().(*cAsyncStreamHandler)
+       return handler.onNextTask(task, metadata)
+}
+
+//export asyncStreamOnError
+func asyncStreamOnError(self *CArrowAsyncDeviceStreamHandler, code C.int, 
message, metadata *C.char) {
+       h := getHandle(self.private_data)
+       switch handler := h.Value().(type) {
+       case *cAsyncStreamHandler:
+               handler.onError(code, message, metadata)
+       case cAsyncState:
+               handler.ch <- AsyncRecordBatchStream{Err: AsyncStreamError{
+                       Code:     int(code),
+                       Msg:      C.GoString(message),
+                       Metadata: C.GoString(metadata),
+               }}
+               close(handler.ch)
+       }
+}
+
+//export asyncStreamRelease
+func asyncStreamRelease(self *CArrowAsyncDeviceStreamHandler) {
+       h := getHandle(self.private_data)
+       if handler, ok := h.Value().(*cAsyncStreamHandler); ok {
+               handler.release()
+       }
+
+       h.Delete()
+       C.free(unsafe.Pointer(self.private_data))
+       self.release = nil
+       self.private_data = nil
+}
+
+func exportAsyncHandler(state cAsyncState, out 
*CArrowAsyncDeviceStreamHandler) {
+       out.on_schema = (*[0]byte)(C.asyncStreamOnSchema)
+       out.on_next_task = (*[0]byte)(C.asyncStreamOnNextTask)
+       out.on_error = (*[0]byte)(C.asyncStreamOnError)
+       out.release = (*[0]byte)(C.asyncStreamRelease)
+       out.private_data = createHandle(cgo.NewHandle(state))
+}
+
+//export asyncProducerRequest
+func asyncProducerRequest(producer *CArrowAsyncProducer, n C.int64_t) {
+       h := getHandle(producer.private_data)
+       handler := h.Value().(*cAsyncProducer)
+       if handler.reqChan != nil {
+               handler.reqChan <- int64(n)
+       }
+}
+
+//export asyncProducerCancel
+func asyncProducerCancel(producer *CArrowAsyncProducer) {
+       h := getHandle(producer.private_data)
+       handler := h.Value().(*cAsyncProducer)
+       if handler.done != nil {
+               close(handler.done)
+               handler.done, handler.reqChan = nil, nil
+       }
+}
+
+//export asyncTaskExtract
+func asyncTaskExtract(task *CArrowAsyncTask, out *CArrowDeviceArray) C.int {
+       h := getHandle(task.private_data)
+       rec := h.Value().(arrow.Record)
+       defer rec.Release()
+
+       out.device_id, out.device_type = C.int64_t(-1), C.ARROW_DEVICE_CPU
+       ExportArrowRecordBatch(rec, &out.array, nil)
+       return C.int(0)
+}
+
+type cAsyncProducer struct {
+       reqChan chan int64
+       done    chan error
+}
+
+func exportAsyncProducer(schema *arrow.Schema, stream <-chan RecordMessage, 
handler *CArrowAsyncDeviceStreamHandler) error {
+       defer C.goReleaseAsyncHandler(handler)
+
+       if schema == nil {
+               err := fmt.Errorf("%w: must have non-nil schema", 
arrow.ErrInvalid)
+               errmsg := C.CString(err.Error())
+               C.goCallOnError(handler, C.EINVAL, errmsg, nil)
+               C.free(unsafe.Pointer(errmsg))
+               return err
+       }
+
+       reqChan, done := make(chan int64, 5), make(chan error, 1)
+       prodHandle := cgo.NewHandle(&cAsyncProducer{reqChan: reqChan, done: 
done})
+       cproducer := prodHandle.Value().(*cAsyncProducer)
+       defer func() {
+               close(reqChan)
+               cproducer.reqChan = nil
+               if cproducer.done != nil {
+                       close(cproducer.done)
+                       cproducer.done = nil
+               }
+
+               prodHandle.Delete()
+       }()
+
+       producer := C.get_producer()
+       defer C.free(unsafe.Pointer(producer))
+
+       producer.device_type = C.ARROW_DEVICE_CPU
+       producer.request = (*[0]byte)(C.asyncProducerRequest)
+       producer.cancel = (*[0]byte)(C.asyncProducerCancel)
+       producer.private_data = createHandle(prodHandle)
+       producer.additional_metadata = nil
+       handler.producer = producer
+
+       var s CArrowSchema
+       ExportArrowSchema(schema, &s)
+       if status := C.goCallOnSchema(handler, &s); status != C.int(0) {
+               releaseExportedSchema(&s)
+               return fmt.Errorf("on_schema failed with status %d", status)
+       }
+
+       var pending int64 = 0
+       for {
+               select {
+               case err, ok := <-done:
+                       if !ok {
+                               return nil
+                       }
+
+                       return err
+               case req := <-reqChan:
+                       pending += req
+               default:
+               }
+
+               if pending > 0 {
+                       select {
+                       case msg, ok := <-stream:
+                               if !ok {
+                                       if status := 
C.goCallOnNextTask(handler, nil, nil); status != C.int(0) {
+                                               return fmt.Errorf("on_next_task 
with nil task failed with status %d", status)
+                                       }
+                                       return nil
+                               }
+
+                               pending--
+                               if msg.Err != nil {
+                                       errmsg := C.CString(msg.Err.Error())
+                                       C.goCallOnError(handler, C.EINVAL, 
errmsg, nil)
+                                       C.free(unsafe.Pointer(errmsg))
+                                       return msg.Err
+                               }
+
+                               var task CArrowAsyncTask
+                               task.extract_data = 
(*[0]byte)(C.asyncTaskExtractTrampoline)
+                               task.private_data = 
createHandle(cgo.NewHandle(msg.Record))
+
+                               var encoded []byte
+                               if msg.AdditionalMetadata.Len() != 0 {
+                                       encoded = 
encodeCMetadata(msg.AdditionalMetadata.Keys(),
+                                               msg.AdditionalMetadata.Values())
+                               }
+
+                               status := C.goCallOnNextTask(handler, &task,
+                                       
(*C.char)(unsafe.Pointer(unsafe.SliceData(encoded))))
+                               if status != C.int(0) {
+                                       msg.Record.Release()
+                                       getHandle(task.private_data).Delete()
+                                       return fmt.Errorf("on_next_task failed 
with status %d", status)
+                               }
+                       default:
+                       }
+               }
+       }
+}
diff --git a/arrow/cdata/interface.go b/arrow/cdata/interface.go
index 93aedc2..fd6ecbf 100644
--- a/arrow/cdata/interface.go
+++ b/arrow/cdata/interface.go
@@ -20,6 +20,7 @@
 package cdata
 
 import (
+       "context"
        "unsafe"
 
        "github.com/apache/arrow-go/v18/arrow"
@@ -282,3 +283,62 @@ func ReleaseCArrowArray(arr *CArrowArray) { 
releaseArr(arr) }
 
 // ReleaseCArrowSchema calls ArrowSchemaRelease on the passed in cdata schema
 func ReleaseCArrowSchema(schema *CArrowSchema) { releaseSchema(schema) }
+
+// RecordMessage is a simple container for a record batch channel to stream for
+// using the Async C Data Interface via ExportAsyncRecordBatchStream.
+type RecordMessage struct {
+       Record             arrow.Record
+       AdditionalMetadata arrow.Metadata
+       Err                error
+}
+
+// AsyncRecordBatchStream represents a stream of record batches being read in
+// from an ArrowAsyncDeviceStreamHandler's callbacks. If an error was 
encountered
+// before the call to on_schema, then this will contain the error as Err. 
Otherwise
+// the Schema will be valid and the Stream is a channel of RecordMessages being
+// propagated via on_next_task and extract_data.
+type AsyncRecordBatchStream struct {
+       Schema             *arrow.Schema
+       AdditionalMetadata arrow.Metadata
+       Err                error
+       Stream             <-chan RecordMessage
+}
+
+// AsyncStreamError represents an error encountered via a call to the on_error
+// callback of an ArrowAsyncDeviceStreamHandler. The Code is the error code 
that
+// should be errno compatible.
+type AsyncStreamError struct {
+       Code     int
+       Msg      string
+       Metadata string
+}
+
+func (e AsyncStreamError) Error() string { return e.Msg }
+
+// CreateAsyncDeviceStreamHandler populates a given 
ArrowAsyncDeviceStreamHandler's callbacks
+// and waits for the on_schema callback to be called before passing the 
AsyncRecordBatchStream
+// object across the returned channel.
+//
+// The provided queueSize is the number of records that will be requested at a 
time to be passed
+// along the Stream in the returned AsyncRecordBatchStream. See the 
documentation on
+// https://arrow.apache.org/docs/format/CDeviceDataInterface.html for more 
information as to the
+// expected semantics of that size.
+//
+// The populated ArrowAsyncDeviceStreamHandler can then be given to any 
compatible provider for
+// async record batch streams via the C Device interface.
+func CreateAsyncDeviceStreamHandler(ctx context.Context, queueSize uint64, out 
*CArrowAsyncDeviceStreamHandler) <-chan AsyncRecordBatchStream {
+       ch := make(chan AsyncRecordBatchStream)
+       exportAsyncHandler(cAsyncState{ctx: ctx, ch: ch, queueSize: queueSize}, 
out)
+       return ch
+}
+
+// ExportAsyncRecordBatchStream takes in a schema and a channel of 
RecordMessages along with a
+// ArrowAsyncDeviceStreamHandler to export the records as they come across the 
channel and call
+// the appropriate callbacks on the handler. This function will block until 
the stream is closed
+// or a message containing an error comes across the channel.
+//
+// The returned error will be nil if everything is successful, otherwise it 
will be the error which
+// is encountered on the stream or an AsyncError if one of the handler 
callbacks returns an error.
+func ExportAsyncRecordBatchStream(schema *arrow.Schema, stream <-chan 
RecordMessage, handler *CArrowAsyncDeviceStreamHandler) error {
+       return exportAsyncProducer(schema, stream, handler)
+}
diff --git a/arrow/cdata/trampoline.c b/arrow/cdata/trampoline.c
index 01db13f..364010b 100644
--- a/arrow/cdata/trampoline.c
+++ b/arrow/cdata/trampoline.c
@@ -20,6 +20,7 @@
 
 int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
 int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
+int asyncTaskExtract(struct ArrowAsyncTask*, struct ArrowDeviceArray*);
 
 int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct 
ArrowSchema* out) {
   // XXX(https://github.com/apache/arrow-adbc/issues/729)
@@ -32,3 +33,8 @@ int streamGetNextTrampoline(struct ArrowArrayStream* stream, 
struct ArrowArray*
   memset(out, 0, sizeof(*out));
   return streamGetNext(stream, out);
 }
+
+int asyncTaskExtractTrampoline(struct ArrowAsyncTask* task, struct 
ArrowDeviceArray* out) {
+  memset(out, 0, sizeof(*out));
+  return asyncTaskExtract(task, out);
+}


Reply via email to