This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 008a3bb4 feat(extensions/nanoarrow_device): Implement asynchronous 
buffer copying (#509)
008a3bb4 is described below

commit 008a3bb4fb404ef470472912e0a38ddd47af7199
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu Jun 27 16:54:18 2024 +0000

    feat(extensions/nanoarrow_device): Implement asynchronous buffer copying 
(#509)
    
    This PR implements asychronous buffer copying when copying CUDA buffers.
    Before this, we had basically been issuing `cuMemCopyDtoH/HtoD()` a lot
    of times in a row with a synchronize up front and a synchronize at the
    end. This was probably not great for performance. Additionally, for
    copying String/Binary/Large String/Large Binary arrays from CUDA to the
    CPU, we were issuing very tiny copies on the offsets buffer and
    synchronizing with the CPU to get the number of bytes to copy for the
    data buffer.
    
    After this PR, when copying from CPU to CUDA, we will be able to return
    before the copy is necessarily completed by setting the output
    `sync_event`.
    
    When copying from CUDA to CPU, the copy is done in one pass if there are
    no string/binary arrays, or two passes if there are. When copying
    string/binary arrays, the implementation walks the entire tree of arrays
    and issues asynchronous copies for the last offset value. Then, the
    stream is synchronized with the CPU, and a second set of asynchronous
    copies are issued for the buffers whose size we now know.
    
    I don't have much experience with CUDA async programming to know if this
    approach could be simplified (e.g., I do this in two streams, but it
    might be that one stream is sufficient since perhaps all of the device
    -> host copies are getting queued against eachother regardless of what
    stream they're on).
    
    This will be easier to test when (e.g., bigger, non-trivial data) it is
    wired up to Python.
    
    TODO:
    
    - Implement `sync_event` integration (both for source and destination)
    - Test more than just a few string arrays
    
    Closes #245.
---
 src/nanoarrow/device/cuda.c         | 212 +++++++++++++++------
 src/nanoarrow/device/cuda_test.cc   | 189 +++++++++++++++++--
 src/nanoarrow/device/device.c       | 357 ++++++++++++++++++++++++------------
 src/nanoarrow/device/device_test.cc |   8 +-
 src/nanoarrow/device/metal.cc       |  48 +++--
 src/nanoarrow/nanoarrow_device.h    | 249 ++++++++++++++++++-------
 6 files changed, 780 insertions(+), 283 deletions(-)

diff --git a/src/nanoarrow/device/cuda.c b/src/nanoarrow/device/cuda.c
index 56ff4dfd..d2db25a8 100644
--- a/src/nanoarrow/device/cuda.c
+++ b/src/nanoarrow/device/cuda.c
@@ -100,9 +100,10 @@ static void ArrowDeviceCudaDeallocator(struct 
ArrowBufferAllocator* allocator,
   ArrowFree(allocator_private);
 }
 
-static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device,
-                                                    struct ArrowBuffer* buffer,
-                                                    int64_t size_bytes) {
+static ArrowErrorCode ArrowDeviceCudaAllocateBufferAsync(struct ArrowDevice* 
device,
+                                                         struct ArrowBuffer* 
buffer,
+                                                         int64_t size_bytes,
+                                                         CUstream hstream) {
   struct ArrowDeviceCudaPrivate* private_data =
       (struct ArrowDeviceCudaPrivate*)device->private_data;
 
@@ -124,11 +125,14 @@ static ArrowErrorCode 
ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device,
   switch (device->device_type) {
     case ARROW_DEVICE_CUDA: {
       CUdeviceptr dptr = 0;
-      if (size_bytes > 0) {  // cuMemalloc requires non-zero size_bytes
-        err = cuMemAlloc(&dptr, (size_t)size_bytes);
+
+      // cuMemalloc requires non-zero size_bytes
+      if (size_bytes > 0) {
+        err = cuMemAllocAsync(&dptr, (size_t)size_bytes, hstream);
       } else {
         err = CUDA_SUCCESS;
       }
+
       ptr = (void*)dptr;
       op = "cuMemAlloc";
       break;
@@ -181,51 +185,105 @@ static void ArrowDeviceCudaArrayRelease(struct 
ArrowArray* array) {
   array->release = NULL;
 }
 
-static ArrowErrorCode ArrowDeviceCudaArrayInit(struct ArrowDevice* device,
-                                               struct ArrowDeviceArray* 
device_array,
-                                               struct ArrowArray* array,
-                                               void* sync_event) {
+static ArrowErrorCode ArrowDeviceCudaArrayInitInternal(
+    struct ArrowDevice* device, struct ArrowDeviceArray* device_array,
+    struct ArrowArray* array, CUevent cu_event) {
   struct ArrowDeviceCudaPrivate* device_private =
       (struct ArrowDeviceCudaPrivate*)device->private_data;
-  // One can create an event with cuEventCreate(&cu_event, CU_EVENT_DEFAULT);
-  // Requires cuCtxPushCurrent() + cuEventCreate() + cuCtxPopCurrent()
 
-  struct ArrowDeviceCudaArrayPrivate* private_data =
+  struct ArrowDeviceCudaArrayPrivate* array_private =
       (struct ArrowDeviceCudaArrayPrivate*)ArrowMalloc(
           sizeof(struct ArrowDeviceCudaArrayPrivate));
-  if (private_data == NULL) {
+  if (array_private == NULL) {
     return ENOMEM;
   }
 
   memset(device_array, 0, sizeof(struct ArrowDeviceArray));
   device_array->array = *array;
-  device_array->array.private_data = private_data;
+  device_array->array.private_data = array_private;
   device_array->array.release = &ArrowDeviceCudaArrayRelease;
-  ArrowArrayMove(array, &private_data->parent);
+  ArrowArrayMove(array, &array_private->parent);
 
   device_array->device_id = device->device_id;
   device_array->device_type = device->device_type;
 
-  if (sync_event != NULL) {
-    private_data->cu_event = *((CUevent*)sync_event);
-    device_array->sync_event = sync_event;
+  if (cu_event != NULL) {
+    array_private->cu_event = cu_event;
+    device_array->sync_event = &array_private->cu_event;
   } else {
-    private_data->cu_event = NULL;
+    array_private->cu_event = NULL;
     device_array->sync_event = NULL;
   }
 
   return NANOARROW_OK;
 }
 
-// TODO: All these buffer copiers would benefit from cudaMemcpyAsync but there 
is
-// no good way to incorporate that just yet
+static ArrowErrorCode ArrowDeviceCudaArrayInitAsync(struct ArrowDevice* device,
+                                                    struct ArrowDeviceArray* 
device_array,
+                                                    struct ArrowArray* array,
+                                                    void* sync_event, void* 
stream) {
+  struct ArrowDeviceCudaPrivate* private_data =
+      (struct ArrowDeviceCudaPrivate*)device->private_data;
+
+  NANOARROW_CUDA_RETURN_NOT_OK(cuCtxPushCurrent(private_data->cu_context),
+                               "cuCtxPushCurrent", NULL);
+  CUcontext unused;  // needed for cuCtxPopCurrent()
+
+  CUevent cu_event;
+  if (sync_event == NULL) {
+    cu_event = NULL;
+  } else {
+    cu_event = *((CUevent*)sync_event);
+  }
+
+  // If the stream was passed, it means that we are required to ensure that
+  // the event that is exported by the output array captures the work that
+  // has been done on stream. If we were not given an event to take ownership 
of,
+  // this means we need to create one.
+  CUevent cu_event_tmp = NULL;
+  CUresult err;
+
+  if (stream != NULL && cu_event == NULL) {
+    // Event is faster with timing disabled (a user can provide their
+    // own event if they want timing enabled)
+    err = cuEventCreate(&cu_event_tmp, CU_EVENT_DISABLE_TIMING);
+    if (err != CUDA_SUCCESS) {
+      NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused));
+      NANOARROW_CUDA_RETURN_NOT_OK(err, "cuEventCreate", NULL);
+    }
+
+    cu_event = cu_event_tmp;
+  }
+
+  if (stream != NULL) {
+    err = cuEventRecord(cu_event, *((CUstream*)stream));
+    if (err != CUDA_SUCCESS) {
+      NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused));
+      if (cu_event_tmp != NULL) {
+        NANOARROW_CUDA_ASSERT_OK(cuEventDestroy(cu_event_tmp));
+      }
+
+      NANOARROW_CUDA_RETURN_NOT_OK(err, "cuEventCreate", NULL);
+    }
+  }
 
-static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* 
device_src,
-                                                        struct ArrowBufferView 
src,
-                                                        struct ArrowDevice* 
device_dst,
-                                                        struct ArrowBufferView 
dst,
-                                                        int* n_pop_context,
-                                                        struct ArrowError* 
error) {
+  int result = ArrowDeviceCudaArrayInitInternal(device, device_array, array, 
cu_event);
+  NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused));
+  if (result != NANOARROW_OK) {
+    if (cu_event_tmp != NULL) {
+      NANOARROW_CUDA_ASSERT_OK(cuEventDestroy(cu_event_tmp));
+    }
+
+    return result;
+  }
+
+  return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowDeviceCudaBufferCopyAsyncInternal(
+    struct ArrowDevice* device_src, struct ArrowBufferView src,
+    struct ArrowDevice* device_dst, struct ArrowBufferView dst, int* 
n_pop_context,
+    struct ArrowError* error, CUstream hstream) {
   // Note: the device_src/sync event must be synchronized before calling these 
methods,
   // even though the cuMemcpyXXX() functions may do this automatically in some 
cases.
 
@@ -238,7 +296,8 @@ static ArrowErrorCode 
ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
     (*n_pop_context)++;
 
     NANOARROW_CUDA_RETURN_NOT_OK(
-        cuMemcpyHtoD((CUdeviceptr)dst.data.data, src.data.data, 
(size_t)src.size_bytes),
+        cuMemcpyHtoDAsync((CUdeviceptr)dst.data.data, src.data.data,
+                          (size_t)src.size_bytes, hstream),
         "cuMemcpyHtoD", error);
 
   } else if (device_src->device_type == ARROW_DEVICE_CUDA &&
@@ -252,9 +311,9 @@ static ArrowErrorCode 
ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
     (*n_pop_context)++;
 
     NANOARROW_CUDA_RETURN_NOT_OK(
-        cuMemcpyDtoD((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data,
-                     (size_t)src.size_bytes),
-        "cuMemcpytoD", error);
+        cuMemcpyDtoDAsync((CUdeviceptr)dst.data.data, 
(CUdeviceptr)src.data.data,
+                          (size_t)src.size_bytes, hstream),
+        "cuMemcpyDtoDAsync", error);
 
   } else if (device_src->device_type == ARROW_DEVICE_CUDA &&
              device_dst->device_type == ARROW_DEVICE_CUDA) {
@@ -264,10 +323,10 @@ static ArrowErrorCode 
ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
         (struct ArrowDeviceCudaPrivate*)device_dst->private_data;
 
     NANOARROW_CUDA_RETURN_NOT_OK(
-        cuMemcpyPeer((CUdeviceptr)dst.data.data, dst_private->cu_context,
-                     (CUdeviceptr)src.data.data, src_private->cu_context,
-                     (size_t)src.size_bytes),
-        "cuMemcpyPeer", error);
+        cuMemcpyPeerAsync((CUdeviceptr)dst.data.data, dst_private->cu_context,
+                          (CUdeviceptr)src.data.data, src_private->cu_context,
+                          (size_t)src.size_bytes, hstream),
+        "cuMemcpyPeerAsync", error);
 
   } else if (device_src->device_type == ARROW_DEVICE_CUDA &&
              device_dst->device_type == ARROW_DEVICE_CPU) {
@@ -278,9 +337,9 @@ static ArrowErrorCode 
ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
                                  "cuCtxPushCurrent", error);
     (*n_pop_context)++;
     NANOARROW_CUDA_RETURN_NOT_OK(
-        cuMemcpyDtoH((void*)dst.data.data, (CUdeviceptr)src.data.data,
-                     (size_t)src.size_bytes),
-        "cuMemcpyDtoH", error);
+        cuMemcpyDtoHAsync((void*)dst.data.data, (CUdeviceptr)src.data.data,
+                          (size_t)src.size_bytes, hstream),
+        "cuMemcpyDtoHAsync", error);
 
   } else if (device_src->device_type == ARROW_DEVICE_CPU &&
              device_dst->device_type == ARROW_DEVICE_CUDA_HOST) {
@@ -301,15 +360,22 @@ static ArrowErrorCode 
ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
   return NANOARROW_OK;
 }
 
-static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src,
-                                                struct ArrowBufferView src,
-                                                struct ArrowDevice* device_dst,
-                                                struct ArrowBufferView dst) {
+static ArrowErrorCode ArrowDeviceCudaBufferCopyAsync(struct ArrowDevice* 
device_src,
+                                                     struct ArrowBufferView 
src,
+                                                     struct ArrowDevice* 
device_dst,
+                                                     struct ArrowBufferView 
dst,
+                                                     void* stream) {
+  if (stream == NULL) {
+    return EINVAL;
+  }
+
+  CUstream hstream = *((CUstream*)stream);
+
   int n_pop_context = 0;
   struct ArrowError error;
 
-  int result = ArrowDeviceCudaBufferCopyInternal(device_src, src, device_dst, 
dst,
-                                                 &n_pop_context, &error);
+  int result = ArrowDeviceCudaBufferCopyAsyncInternal(device_src, src, 
device_dst, dst,
+                                                      &n_pop_context, &error, 
hstream);
   for (int i = 0; i < n_pop_context; i++) {
     CUcontext unused;
     NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused));
@@ -318,17 +384,24 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct 
ArrowDevice* device_src,
   return result;
 }
 
-static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src,
-                                                struct ArrowBufferView src,
-                                                struct ArrowDevice* device_dst,
-                                                struct ArrowBuffer* dst) {
+static ArrowErrorCode ArrowDeviceCudaBufferInitAsync(struct ArrowDevice* 
device_src,
+                                                     struct ArrowBufferView 
src,
+                                                     struct ArrowDevice* 
device_dst,
+                                                     struct ArrowBuffer* dst,
+                                                     void* stream) {
+  if (stream == NULL) {
+    return EINVAL;
+  }
+
+  CUstream hstream = *((CUstream*)stream);
+
   struct ArrowBuffer tmp;
 
   switch (device_dst->device_type) {
     case ARROW_DEVICE_CUDA:
     case ARROW_DEVICE_CUDA_HOST:
       NANOARROW_RETURN_NOT_OK(
-          ArrowDeviceCudaAllocateBuffer(device_dst, &tmp, src.size_bytes));
+          ArrowDeviceCudaAllocateBufferAsync(device_dst, &tmp, src.size_bytes, 
hstream));
       break;
     case ARROW_DEVICE_CPU:
       ArrowBufferInit(&tmp);
@@ -341,7 +414,8 @@ static ArrowErrorCode ArrowDeviceCudaBufferInit(struct 
ArrowDevice* device_src,
   struct ArrowBufferView tmp_view;
   tmp_view.data.data = tmp.data;
   tmp_view.size_bytes = tmp.size_bytes;
-  int result = ArrowDeviceCudaBufferCopy(device_src, src, device_dst, 
tmp_view);
+  int result =
+      ArrowDeviceCudaBufferCopyAsync(device_src, src, device_dst, tmp_view, 
&hstream);
   if (result != NANOARROW_OK) {
     ArrowBufferReset(&tmp);
     return result;
@@ -352,7 +426,7 @@ static ArrowErrorCode ArrowDeviceCudaBufferInit(struct 
ArrowDevice* device_src,
 }
 
 static ArrowErrorCode ArrowDeviceCudaSynchronize(struct ArrowDevice* device,
-                                                 void* sync_event,
+                                                 void* sync_event, void* 
stream,
                                                  struct ArrowError* error) {
   if (sync_event == NULL) {
     return NANOARROW_OK;
@@ -363,11 +437,33 @@ static ArrowErrorCode ArrowDeviceCudaSynchronize(struct 
ArrowDevice* device,
     return ENOTSUP;
   }
 
+  // Sync functions require a context to be set
+  struct ArrowDeviceCudaPrivate* private_data =
+      (struct ArrowDeviceCudaPrivate*)device->private_data;
+
+  NANOARROW_CUDA_RETURN_NOT_OK(cuCtxPushCurrent(private_data->cu_context),
+                               "cuCtxPushCurrent", NULL);
+  CUcontext unused;  // needed for cuCtxPopCurrent()
+
   // Memory for cuda_event is owned by the ArrowArray member of the 
ArrowDeviceArray
-  CUevent* cuda_event = (CUevent*)sync_event;
-  NANOARROW_CUDA_RETURN_NOT_OK(cuEventSynchronize(*cuda_event), 
"cuEventSynchronize",
-                               error);
+  CUevent* cu_event = (CUevent*)sync_event;
+  CUstream* cu_stream = (CUstream*)stream;
+  CUresult err;
+  const char* op = "";
+
+  if (cu_stream == NULL && cu_event != NULL) {
+    err = cuEventSynchronize(*cu_event);
+    op = "cuEventSynchronize";
+  } else if (cu_stream != NULL && cu_event == NULL) {
+    err = cuStreamSynchronize(*cu_stream);
+    op = "cuStreamSynchronize";
+  } else if (cu_stream != NULL && cu_event != NULL) {
+    err = cuStreamWaitEvent(*cu_stream, *cu_event, CU_EVENT_WAIT_DEFAULT);
+    op = "cuStreamWaitEvent";
+  }
 
+  NANOARROW_ASSERT_OK(cuCtxPopCurrent(&unused));
+  NANOARROW_CUDA_RETURN_NOT_OK(err, op, error);
   return NANOARROW_OK;
 }
 
@@ -384,7 +480,7 @@ static ArrowErrorCode ArrowDeviceCudaArrayMove(struct 
ArrowDevice* device_src,
     // We do have to wait on the sync event, though, because this has to be 
NULL
     // for a CPU device array.
     NANOARROW_RETURN_NOT_OK(
-        ArrowDeviceCudaSynchronize(device_src, src->sync_event, NULL));
+        ArrowDeviceCudaSynchronize(device_src, src->sync_event, NULL, NULL));
     ArrowDeviceArrayMove(src, dst);
     dst->device_type = device_dst->device_type;
     dst->device_id = device_dst->device_id;
@@ -436,11 +532,11 @@ static ArrowErrorCode ArrowDeviceCudaInitDevice(struct 
ArrowDevice* device,
 
   device->device_type = device_type;
   device->device_id = device_id;
-  device->array_init = &ArrowDeviceCudaArrayInit;
+  device->array_init = &ArrowDeviceCudaArrayInitAsync;
   device->array_move = &ArrowDeviceCudaArrayMove;
-  device->buffer_init = &ArrowDeviceCudaBufferInit;
+  device->buffer_init = &ArrowDeviceCudaBufferInitAsync;
   device->buffer_move = NULL;
-  device->buffer_copy = &ArrowDeviceCudaBufferCopy;
+  device->buffer_copy = &ArrowDeviceCudaBufferCopyAsync;
   device->synchronize_event = &ArrowDeviceCudaSynchronize;
   device->release = &ArrowDeviceCudaRelease;
 
diff --git a/src/nanoarrow/device/cuda_test.cc 
b/src/nanoarrow/device/cuda_test.cc
index ee89fa05..4b66d43b 100644
--- a/src/nanoarrow/device/cuda_test.cc
+++ b/src/nanoarrow/device/cuda_test.cc
@@ -55,6 +55,66 @@ class CudaTemporaryContext {
   CUcontext context_;
 };
 
+class CudaStream {
+ public:
+  CudaStream(int64_t device_id) : device_id_(device_id), hstream_(0) {}
+
+  ArrowErrorCode Init() {
+    CudaTemporaryContext ctx(device_id_);
+    if (!ctx.valid()) {
+      return EINVAL;
+    }
+
+    if (cuStreamCreate(&hstream_, CU_STREAM_DEFAULT) != CUDA_SUCCESS) {
+      return EINVAL;
+    }
+
+    return NANOARROW_OK;
+  }
+
+  CUstream* get() { return &hstream_; }
+
+  ~CudaStream() {
+    if (hstream_ != 0) {
+      cuStreamDestroy(hstream_);
+    }
+  }
+
+  int64_t device_id_;
+  CUstream hstream_;
+};
+
+class CudaEvent {
+ public:
+  CudaEvent(int64_t device_id) : device_id_(device_id), hevent_(nullptr) {}
+
+  ArrowErrorCode Init() {
+    CudaTemporaryContext ctx(device_id_);
+    if (!ctx.valid()) {
+      return EINVAL;
+    }
+
+    if (cuEventCreate(&hevent_, CU_EVENT_DEFAULT) != CUDA_SUCCESS) {
+      return EINVAL;
+    }
+
+    return NANOARROW_OK;
+  }
+
+  CUevent* get() { return &hevent_; }
+
+  void release() { hevent_ = nullptr; }
+
+  ~CudaEvent() {
+    if (hevent_ != nullptr) {
+      cuEventDestroy(hevent_);
+    }
+  }
+
+  int64_t device_id_;
+  CUevent hevent_;
+};
+
 TEST(NanoarrowDeviceCuda, GetDevice) {
   struct ArrowDevice* cuda = ArrowDeviceCuda(ARROW_DEVICE_CUDA, 0);
   ASSERT_NE(cuda, nullptr);
@@ -79,24 +139,35 @@ TEST(NanoarrowDeviceCuda, DeviceCudaBufferInit) {
   uint8_t data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
   struct ArrowBufferView cpu_view = {data, sizeof(data)};
 
+  CudaStream stream(gpu->device_id);
+  ASSERT_EQ(stream.Init(), NANOARROW_OK);
+
+  // Failing to provide a stream should error
+  ASSERT_EQ(ArrowDeviceBufferInitAsync(cpu, cpu_view, gpu, nullptr, nullptr), 
EINVAL);
+
   // CPU -> GPU
-  ASSERT_EQ(ArrowDeviceBufferInit(cpu, cpu_view, gpu, &buffer_gpu), 
NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferInitAsync(cpu, cpu_view, gpu, &buffer_gpu, 
stream.get()),
+            NANOARROW_OK);
   EXPECT_EQ(buffer_gpu.size_bytes, sizeof(data));
   // (Content is tested on the roundtrip)
   struct ArrowBufferView gpu_view = {buffer_gpu.data, buffer_gpu.size_bytes};
 
   // GPU -> GPU
-  ASSERT_EQ(ArrowDeviceBufferInit(gpu, gpu_view, gpu, &buffer), NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferInitAsync(gpu, gpu_view, gpu, &buffer, 
stream.get()),
+            NANOARROW_OK);
   EXPECT_EQ(buffer.size_bytes, sizeof(data));
   // (Content is tested on the roundtrip)
   ArrowBufferReset(&buffer);
 
   // GPU -> CPU
-  ASSERT_EQ(ArrowDeviceBufferInit(gpu, gpu_view, cpu, &buffer), NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferInitAsync(gpu, gpu_view, cpu, &buffer, 
stream.get()),
+            NANOARROW_OK);
   EXPECT_EQ(buffer.size_bytes, sizeof(data));
+
+  ASSERT_EQ(cuStreamSynchronize(*stream.get()), CUDA_SUCCESS);
   EXPECT_EQ(memcmp(buffer.data, data, sizeof(data)), 0);
-  ArrowBufferReset(&buffer);
 
+  ArrowBufferReset(&buffer);
   ArrowBufferReset(&buffer_gpu);
 }
 
@@ -110,25 +181,33 @@ TEST(NanoarrowDeviceCuda, DeviceCudaHostBufferInit) {
   uint8_t data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
   struct ArrowBufferView cpu_view = {data, sizeof(data)};
 
+  CudaStream stream(gpu->device_id);
+  ASSERT_EQ(stream.Init(), NANOARROW_OK);
+
   // CPU -> GPU
-  ASSERT_EQ(ArrowDeviceBufferInit(cpu, cpu_view, gpu, &buffer_gpu), 
NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferInitAsync(cpu, cpu_view, gpu, &buffer_gpu, 
stream.get()),
+            NANOARROW_OK);
   EXPECT_EQ(buffer_gpu.size_bytes, sizeof(data));
   EXPECT_EQ(memcmp(buffer_gpu.data, data, sizeof(data)), 0);
   // Here, "GPU" is memory in the CPU space allocated by cudaMallocHost
   struct ArrowBufferView gpu_view = {buffer_gpu.data, buffer_gpu.size_bytes};
 
   // GPU -> GPU
-  ASSERT_EQ(ArrowDeviceBufferInit(gpu, gpu_view, gpu, &buffer), NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferInitAsync(gpu, gpu_view, gpu, &buffer, 
stream.get()),
+            NANOARROW_OK);
   EXPECT_EQ(buffer.size_bytes, sizeof(data));
   EXPECT_EQ(memcmp(buffer.data, data, sizeof(data)), 0);
   ArrowBufferReset(&buffer);
 
   // GPU -> CPU
-  ASSERT_EQ(ArrowDeviceBufferInit(gpu, gpu_view, cpu, &buffer), NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferInitAsync(gpu, gpu_view, cpu, &buffer, 
stream.get()),
+            NANOARROW_OK);
   EXPECT_EQ(buffer.size_bytes, sizeof(data));
+
+  ASSERT_EQ(cuStreamSynchronize(*stream.get()), CUDA_SUCCESS);
   EXPECT_EQ(memcmp(buffer.data, data, sizeof(data)), 0);
-  ArrowBufferReset(&buffer);
 
+  ArrowBufferReset(&buffer);
   ArrowBufferReset(&buffer_gpu);
 }
 
@@ -157,18 +236,28 @@ TEST(NanoarrowDeviceCuda, DeviceCudaBufferCopy) {
     GTEST_FAIL() << "cuMemAlloc() failed";
   }
 
+  CudaStream stream(gpu->device_id);
+  ASSERT_EQ(stream.Init(), NANOARROW_OK);
+
+  // Failing to provide a stream should error
+  ASSERT_EQ(ArrowDeviceBufferCopyAsync(cpu, cpu_view, gpu, gpu_view, nullptr), 
EINVAL);
+
   // CPU -> GPU
-  ASSERT_EQ(ArrowDeviceBufferCopy(cpu, cpu_view, gpu, gpu_view), NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferCopyAsync(cpu, cpu_view, gpu, gpu_view, 
stream.get()),
+            NANOARROW_OK);
 
   // GPU -> GPU
-  ASSERT_EQ(ArrowDeviceBufferCopy(gpu, gpu_view, gpu, gpu_view2), 
NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferCopyAsync(gpu, gpu_view, gpu, gpu_view2, 
stream.get()),
+            NANOARROW_OK);
 
   // GPU -> CPU
   uint8_t cpu_dest[5];
   struct ArrowBufferView cpu_dest_view = {cpu_dest, sizeof(data)};
-  ASSERT_EQ(ArrowDeviceBufferCopy(gpu, gpu_view, cpu, cpu_dest_view), 
NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceBufferCopyAsync(gpu, gpu_view, cpu, cpu_dest_view, 
stream.get()),
+            NANOARROW_OK);
 
   // Check roundtrip
+  ASSERT_EQ(cuStreamSynchronize(*stream.get()), CUDA_SUCCESS);
   EXPECT_EQ(memcmp(cpu_dest, data, sizeof(data)), 0);
 
   // Clean up
@@ -183,6 +272,56 @@ TEST(NanoarrowDeviceCuda, DeviceCudaBufferCopy) {
   }
 }
 
+TEST(NanoarrowDeviceCuda, DeviceCudaArrayInit) {
+  struct ArrowDevice* gpu = ArrowDeviceCuda(ARROW_DEVICE_CUDA, 0);
+
+  CudaStream stream(gpu->device_id);
+  ASSERT_EQ(stream.Init(), NANOARROW_OK);
+
+  CudaEvent event(gpu->device_id);
+  ASSERT_EQ(event.Init(), NANOARROW_OK);
+
+  struct ArrowDeviceArray device_array;
+  struct ArrowArray array;
+  array.release = nullptr;
+
+  // No provided sync event should result in a null sync event in the final 
array
+  ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceArrayInit(gpu, &device_array, &array, nullptr), 
NANOARROW_OK);
+  ASSERT_EQ(device_array.sync_event, nullptr);
+  ArrowArrayRelease(&device_array.array);
+
+  // Provided sync event should result in ownership of the event being taken 
by the
+  // device array.
+  device_array.sync_event = nullptr;
+  ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceArrayInit(gpu, &device_array, &array, event.get()), 
NANOARROW_OK);
+  ASSERT_EQ(*((CUevent*)device_array.sync_event), *event.get());
+  event.release();
+  ArrowArrayRelease(&device_array.array);
+
+  // Provided stream without provided event should result in an event created 
by and owned
+  // by the device array
+  device_array.sync_event = nullptr;
+  ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+  ASSERT_EQ(ArrowDeviceArrayInitAsync(gpu, &device_array, &array, nullptr, 
stream.get()),
+            NANOARROW_OK);
+  ASSERT_NE(*(CUevent*)device_array.sync_event, nullptr);
+  ArrowArrayRelease(&device_array.array);
+
+  // Provided stream and sync event should result in the device array taking 
ownership
+  // and recording the event
+  ASSERT_EQ(event.Init(), NANOARROW_OK);
+  device_array.sync_event = nullptr;
+  ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+  ASSERT_EQ(
+      ArrowDeviceArrayInitAsync(gpu, &device_array, &array, event.get(), 
stream.get()),
+      NANOARROW_OK);
+  ASSERT_EQ(*((CUevent*)device_array.sync_event), *event.get());
+  event.release();
+  ArrowArrayRelease(&device_array.array);
+}
+
 class StringTypeParameterizedTestFixture
     : public ::testing::TestWithParam<std::tuple<ArrowDeviceType, enum 
ArrowType, bool>> {
  protected:
@@ -207,6 +346,10 @@ TEST_P(StringTypeParameterizedTestFixture, 
ArrowDeviceCudaArrayViewString) {
   bool include_null = std::get<2>(GetParam());
   int64_t expected_data_size;  // expected
 
+  CudaStream stream(gpu->device_id);
+  ASSERT_EQ(stream.Init(), NANOARROW_OK);
+
+  // Create some test data
   ASSERT_EQ(ArrowArrayInitFromType(&array, string_type), NANOARROW_OK);
   ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK);
   ASSERT_EQ(ArrowArrayAppendString(&array, "abc"_asv), NANOARROW_OK);
@@ -230,19 +373,25 @@ TEST_P(StringTypeParameterizedTestFixture, 
ArrowDeviceCudaArrayViewString) {
   EXPECT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, 
expected_data_size);
   EXPECT_EQ(device_array.array.length, 3);
 
+  // Failing to provide a stream should error
+  ASSERT_EQ(ArrowDeviceArrayViewCopyAsync(&device_array_view, gpu, nullptr, 
nullptr),
+            EINVAL);
+
   // Copy required to Cuda
   struct ArrowDeviceArray device_array2;
   device_array2.array.release = nullptr;
   ASSERT_EQ(ArrowDeviceArrayMoveToDevice(&device_array, gpu, &device_array2), 
ENOTSUP);
-  ASSERT_EQ(ArrowDeviceArrayViewCopy(&device_array_view, gpu, &device_array2),
+  ASSERT_EQ(ArrowDeviceArrayViewCopyAsync(&device_array_view, gpu, 
&device_array2,
+                                          stream.get()),
             NANOARROW_OK);
   ArrowArrayRelease(&device_array.array);
 
   ASSERT_NE(device_array2.array.release, nullptr);
   ASSERT_EQ(device_array2.device_id, gpu->device_id);
-  ASSERT_EQ(ArrowDeviceArrayViewSetArray(&device_array_view, &device_array2, 
nullptr),
-            NANOARROW_OK);
-  EXPECT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, 
expected_data_size);
+  ASSERT_EQ(
+      ArrowDeviceArrayViewSetArrayMinimal(&device_array_view, &device_array2, 
nullptr),
+      NANOARROW_OK);
+  EXPECT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, -1);
   EXPECT_EQ(device_array_view.array_view.length, 3);
   EXPECT_EQ(device_array2.array.length, 3);
 
@@ -251,7 +400,8 @@ TEST_P(StringTypeParameterizedTestFixture, 
ArrowDeviceCudaArrayViewString) {
     ASSERT_EQ(ArrowDeviceArrayMoveToDevice(&device_array2, cpu, &device_array),
               NANOARROW_OK);
   } else {
-    ASSERT_EQ(ArrowDeviceArrayViewCopy(&device_array_view, cpu, &device_array),
+    ASSERT_EQ(ArrowDeviceArrayViewCopyAsync(&device_array_view, cpu, 
&device_array,
+                                            stream.get()),
               NANOARROW_OK);
     ArrowArrayRelease(&device_array2.array);
   }
@@ -261,8 +411,9 @@ TEST_P(StringTypeParameterizedTestFixture, 
ArrowDeviceCudaArrayViewString) {
   ASSERT_EQ(ArrowDeviceArrayViewSetArray(&device_array_view, &device_array, 
nullptr),
             NANOARROW_OK);
 
-  EXPECT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, 
expected_data_size);
+  ASSERT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, 
expected_data_size);
 
+  ASSERT_EQ(cuStreamSynchronize(*stream.get()), CUDA_SUCCESS);
   if (include_null) {
     EXPECT_EQ(
         memcmp(device_array_view.array_view.buffer_views[2].data.data, 
"abcdefg", 7), 0);
@@ -294,6 +445,4 @@ INSTANTIATE_TEST_SUITE_P(
         TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_BINARY, true),
         TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_BINARY, false),
         TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_LARGE_BINARY, true),
-        TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_LARGE_BINARY, false)
-
-            ));
+        TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_LARGE_BINARY, 
false)));
diff --git a/src/nanoarrow/device/device.c b/src/nanoarrow/device/device.c
index 14b57b5d..352db253 100644
--- a/src/nanoarrow/device/device.c
+++ b/src/nanoarrow/device/device.c
@@ -43,15 +43,20 @@ static void ArrowDeviceArrayInitDefault(struct ArrowDevice* 
device,
   ArrowArrayMove(array, &device_array->array);
 }
 
-static ArrowErrorCode ArrowDeviceCpuBufferInit(struct ArrowDevice* device_src,
-                                               struct ArrowBufferView src,
-                                               struct ArrowDevice* device_dst,
-                                               struct ArrowBuffer* dst) {
+static ArrowErrorCode ArrowDeviceCpuBufferInitAsync(struct ArrowDevice* 
device_src,
+                                                    struct ArrowBufferView src,
+                                                    struct ArrowDevice* 
device_dst,
+                                                    struct ArrowBuffer* dst,
+                                                    void* stream) {
   if (device_dst->device_type != ARROW_DEVICE_CPU ||
       device_src->device_type != ARROW_DEVICE_CPU) {
     return ENOTSUP;
   }
 
+  if (stream != NULL) {
+    return EINVAL;
+  }
+
   ArrowBufferInit(dst);
   dst->allocator = ArrowBufferAllocatorDefault();
   NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(dst, src.data.as_uint8, 
src.size_bytes));
@@ -74,30 +79,35 @@ static ArrowErrorCode ArrowDeviceCpuBufferMove(struct 
ArrowDevice* device_src,
 static ArrowErrorCode ArrowDeviceCpuBufferCopy(struct ArrowDevice* device_src,
                                                struct ArrowBufferView src,
                                                struct ArrowDevice* device_dst,
-                                               struct ArrowBufferView dst) {
+                                               struct ArrowBufferView dst, 
void* stream) {
   if (device_dst->device_type != ARROW_DEVICE_CPU ||
       device_src->device_type != ARROW_DEVICE_CPU) {
     return ENOTSUP;
   }
 
+  if (stream != NULL) {
+    return EINVAL;
+  }
+
   memcpy((uint8_t*)dst.data.as_uint8, src.data.as_uint8, dst.size_bytes);
   return NANOARROW_OK;
 }
 
 static ArrowErrorCode ArrowDeviceCpuSynchronize(struct ArrowDevice* device,
-                                                void* sync_event,
+                                                void* sync_event, void* stream,
                                                 struct ArrowError* error) {
   switch (device->device_type) {
     case ARROW_DEVICE_CPU:
-      if (sync_event != NULL) {
-        ArrowErrorSet(error, "Expected NULL sync_event for ARROW_DEVICE_CPU 
but got %p",
-                      sync_event);
+      if (sync_event != NULL || stream != NULL) {
+        ArrowErrorSet(error, "sync_event and stream must be NULL for 
ARROW_DEVICE_CPU");
         return EINVAL;
       } else {
         return NANOARROW_OK;
       }
     default:
-      return device->synchronize_event(device, sync_event, error);
+      ArrowErrorSet(error, "Expected CPU device but got device type %d",
+                    (int)device->device_id);
+      return ENOTSUP;
   }
 }
 
@@ -118,7 +128,7 @@ void ArrowDeviceInitCpu(struct ArrowDevice* device) {
   device->device_id = -1;
   device->array_init = NULL;
   device->array_move = NULL;
-  device->buffer_init = &ArrowDeviceCpuBufferInit;
+  device->buffer_init = &ArrowDeviceCpuBufferInitAsync;
   device->buffer_move = &ArrowDeviceCpuBufferMove;
   device->buffer_copy = &ArrowDeviceCpuBufferCopy;
   device->synchronize_event = &ArrowDeviceCpuSynchronize;
@@ -145,15 +155,16 @@ struct ArrowDevice* ArrowDeviceResolve(ArrowDeviceType 
device_type, int64_t devi
   return NULL;
 }
 
-ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device,
-                                    struct ArrowDeviceArray* device_array,
-                                    struct ArrowArray* array, void* 
sync_event) {
+ArrowErrorCode ArrowDeviceArrayInitAsync(struct ArrowDevice* device,
+                                         struct ArrowDeviceArray* device_array,
+                                         struct ArrowArray* array, void* 
sync_event,
+                                         void* stream) {
   if (device->array_init != NULL) {
-    return device->array_init(device, device_array, array, sync_event);
+    return device->array_init(device, device_array, array, sync_event, stream);
   }
 
-  // Handling a sync event is not supported in the default constructor
-  if (sync_event != NULL) {
+  // Sync event and stream aren't handled by the fallback implementation
+  if (sync_event != NULL || stream != NULL) {
     return EINVAL;
   }
 
@@ -161,13 +172,13 @@ ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* 
device,
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* device_src,
-                                     struct ArrowBufferView src,
-                                     struct ArrowDevice* device_dst,
-                                     struct ArrowBuffer* dst) {
-  int result = device_dst->buffer_init(device_src, src, device_dst, dst);
+ArrowErrorCode ArrowDeviceBufferInitAsync(struct ArrowDevice* device_src,
+                                          struct ArrowBufferView src,
+                                          struct ArrowDevice* device_dst,
+                                          struct ArrowBuffer* dst, void* 
stream) {
+  int result = device_dst->buffer_init(device_src, src, device_dst, dst, 
stream);
   if (result == ENOTSUP) {
-    result = device_src->buffer_init(device_src, src, device_dst, dst);
+    result = device_src->buffer_init(device_src, src, device_dst, dst, stream);
   }
 
   return result;
@@ -185,13 +196,13 @@ ArrowErrorCode ArrowDeviceBufferMove(struct ArrowDevice* 
device_src,
   return result;
 }
 
-ArrowErrorCode ArrowDeviceBufferCopy(struct ArrowDevice* device_src,
-                                     struct ArrowBufferView src,
-                                     struct ArrowDevice* device_dst,
-                                     struct ArrowBufferView dst) {
-  int result = device_dst->buffer_copy(device_src, src, device_dst, dst);
+ArrowErrorCode ArrowDeviceBufferCopyAsync(struct ArrowDevice* device_src,
+                                          struct ArrowBufferView src,
+                                          struct ArrowDevice* device_dst,
+                                          struct ArrowBufferView dst, void* 
stream) {
+  int result = device_dst->buffer_copy(device_src, src, device_dst, dst, 
stream);
   if (result == ENOTSUP) {
-    result = device_src->buffer_copy(device_src, src, device_dst, dst);
+    result = device_src->buffer_copy(device_src, src, device_dst, dst, stream);
   }
 
   return result;
@@ -273,64 +284,106 @@ void ArrowDeviceArrayViewReset(struct 
ArrowDeviceArrayView* device_array_view) {
   device_array_view->device = NULL;
 }
 
-static ArrowErrorCode ArrowDeviceBufferGetInt32(struct ArrowDevice* device,
-                                                struct ArrowBufferView 
buffer_view,
-                                                int64_t i, int32_t* out) {
-  struct ArrowBufferView out_view;
-  out_view.data.as_int32 = out;
-  out_view.size_bytes = sizeof(int32_t);
+ArrowErrorCode ArrowDeviceArrayViewSetArrayMinimal(
+    struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* 
device_array,
+    struct ArrowError* error) {
+  // Resolve device
+  struct ArrowDevice* device =
+      ArrowDeviceResolve(device_array->device_type, device_array->device_id);
+  if (device == NULL) {
+    ArrowErrorSet(error,
+                  "Can't resolve device with type %" PRId32 " and identifier 
%" PRId64,
+                  device_array->device_type, device_array->device_id);
+    return EINVAL;
+  }
+
+  // Set the device array device
+  device_array_view->device = device;
+
+  // Populate the array_view
+  
NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArrayMinimal(&device_array_view->array_view,
+                                                        &device_array->array, 
error));
+
+  // Populate the sync_event
+  device_array_view->sync_event = device_array->sync_event;
 
-  struct ArrowBufferView device_buffer_view;
-  device_buffer_view.data.as_int32 = buffer_view.data.as_int32 + i;
-  device_buffer_view.size_bytes = sizeof(int32_t);
-  NANOARROW_RETURN_NOT_OK(
-      ArrowDeviceBufferCopy(device, device_buffer_view, ArrowDeviceCpu(), 
out_view));
   return NANOARROW_OK;
 }
 
-static ArrowErrorCode ArrowDeviceBufferGetInt64(struct ArrowDevice* device,
-                                                struct ArrowBufferView 
buffer_view,
-                                                int64_t i, int64_t* out) {
-  struct ArrowBufferView out_view;
-  out_view.data.as_int64 = out;
-  out_view.size_bytes = sizeof(int64_t);
+// Walks the tree of arrays to count the number of buffers with unknown size
+// and the number of bytes we need to copy from a device buffer to find it.
+static ArrowErrorCode ArrowDeviceArrayViewWalkUnknownBufferSizes(
+    struct ArrowArrayView* array_view, int64_t* offset_buffer_size) {
+  switch (array_view->storage_type) {
+    case NANOARROW_TYPE_STRING:
+    case NANOARROW_TYPE_BINARY:
+    case NANOARROW_TYPE_LARGE_STRING:
+    case NANOARROW_TYPE_LARGE_BINARY:
+      if (array_view->length == 0 || array_view->buffer_views[1].size_bytes == 
0) {
+        array_view->buffer_views[2].size_bytes = 0;
+      } else if (array_view->buffer_views[2].size_bytes == -1) {
+        *offset_buffer_size += array_view->layout.element_size_bits[1] / 8;
+      }
+      break;
+    default:
+      break;
+  }
+
+  // Recurse for children
+  for (int64_t i = 0; i < array_view->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewWalkUnknownBufferSizes(
+        array_view->children[i], offset_buffer_size));
+  }
+
+  // ...and for dictionary
+  if (array_view->dictionary != NULL) {
+    NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewWalkUnknownBufferSizes(
+        array_view->dictionary, offset_buffer_size));
+  }
 
-  struct ArrowBufferView device_buffer_view;
-  device_buffer_view.data.as_int64 = buffer_view.data.as_int64 + i;
-  device_buffer_view.size_bytes = sizeof(int64_t);
-  NANOARROW_RETURN_NOT_OK(
-      ArrowDeviceBufferCopy(device, device_buffer_view, ArrowDeviceCpu(), 
out_view));
   return NANOARROW_OK;
 }
 
-static ArrowErrorCode ArrowDeviceArrayViewResolveBufferSizes(
-    struct ArrowDevice* device, struct ArrowArrayView* array_view) {
-  // Calculate buffer sizes that require accessing the offset buffer
-  // (at this point all other sizes have been resolved).
+// Walks the tree of arrays and launches an async copy of the relevant
+// item in the array's offset buffer to the temporary buffer we've just
+// allocated to collect these values.
+static ArrowErrorCode ArrowDeviceArrayViewResolveUnknownBufferSizesAsync(
+    struct ArrowDevice* device, struct ArrowArrayView* array_view,
+    uint8_t** offset_value_dst, void* stream) {
   int64_t offset_plus_length = array_view->offset + array_view->length;
-  int32_t last_offset32;
-  int64_t last_offset64;
+
+  struct ArrowBufferView src_view;
+  struct ArrowBufferView dst_view;
 
   switch (array_view->storage_type) {
     case NANOARROW_TYPE_STRING:
     case NANOARROW_TYPE_BINARY:
-      if (array_view->buffer_views[1].size_bytes == 0) {
-        array_view->buffer_views[2].size_bytes = 0;
-      } else if (array_view->buffer_views[2].size_bytes == -1) {
-        NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferGetInt32(
-            device, array_view->buffer_views[1], offset_plus_length, 
&last_offset32));
-        array_view->buffer_views[2].size_bytes = last_offset32;
+      if (array_view->buffer_views[2].size_bytes == -1) {
+        src_view.data.as_int32 =
+            array_view->buffer_views[1].data.as_int32 + offset_plus_length;
+        src_view.size_bytes = sizeof(int32_t);
+        dst_view.data.as_uint8 = *offset_value_dst;
+        dst_view.size_bytes = sizeof(int32_t);
+
+        NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferCopyAsync(
+            device, src_view, ArrowDeviceCpu(), dst_view, stream));
+
+        (*offset_value_dst) += sizeof(int32_t);
       }
       break;
-
     case NANOARROW_TYPE_LARGE_STRING:
     case NANOARROW_TYPE_LARGE_BINARY:
-      if (array_view->buffer_views[1].size_bytes == 0) {
-        array_view->buffer_views[2].size_bytes = 0;
-      } else if (array_view->buffer_views[2].size_bytes == -1) {
-        NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferGetInt64(
-            device, array_view->buffer_views[1], offset_plus_length, 
&last_offset64));
-        array_view->buffer_views[2].size_bytes = last_offset64;
+      if (array_view->buffer_views[2].size_bytes == -1) {
+        src_view.data.as_int64 =
+            array_view->buffer_views[1].data.as_int64 + offset_plus_length;
+        src_view.size_bytes = sizeof(int64_t);
+        dst_view.data.as_uint8 = *offset_value_dst;
+        dst_view.size_bytes = sizeof(int64_t);
+
+        NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferCopyAsync(
+            device, src_view, ArrowDeviceCpu(), dst_view, stream));
+
+        (*offset_value_dst) += sizeof(int64_t);
       }
       break;
     default:
@@ -339,55 +392,117 @@ static ArrowErrorCode 
ArrowDeviceArrayViewResolveBufferSizes(
 
   // Recurse for children
   for (int64_t i = 0; i < array_view->n_children; i++) {
-    NANOARROW_RETURN_NOT_OK(
-        ArrowDeviceArrayViewResolveBufferSizes(device, 
array_view->children[i]));
+    NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewResolveUnknownBufferSizesAsync(
+        device, array_view->children[i], offset_value_dst, stream));
+  }
+
+  // ...and for dictionary
+  if (array_view->dictionary != NULL) {
+    NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewResolveUnknownBufferSizesAsync(
+        device, array_view->dictionary, offset_value_dst, stream));
   }
 
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowDeviceArrayViewSetArrayMinimal(
-    struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* 
device_array,
+// After synchronizing the stream with the CPU to ensure that all of the
+// buffer sizes have been copied to the our temporary buffer, relay them
+// back to the appropriate buffer view so that the buffer copier can
+// do its thing.
+static void ArrowDeviceArrayViewCollectUnknownBufferSizes(
+    struct ArrowArrayView* array_view, uint8_t** offset_value_dst) {
+  switch (array_view->storage_type) {
+    case NANOARROW_TYPE_STRING:
+    case NANOARROW_TYPE_BINARY:
+      if (array_view->buffer_views[2].size_bytes == -1) {
+        int32_t size_bytes_32;
+        memcpy(&size_bytes_32, *offset_value_dst, sizeof(int32_t));
+        array_view->buffer_views[2].size_bytes = size_bytes_32;
+        (*offset_value_dst) += sizeof(int32_t);
+      }
+      break;
+    case NANOARROW_TYPE_LARGE_STRING:
+    case NANOARROW_TYPE_LARGE_BINARY:
+      if (array_view->buffer_views[2].size_bytes == -1) {
+        memcpy(&array_view->buffer_views[2].size_bytes, *offset_value_dst,
+               sizeof(int64_t));
+        (*offset_value_dst) += sizeof(int64_t);
+      }
+      break;
+    default:
+      break;
+  }
+
+  // Recurse for children
+  for (int64_t i = 0; i < array_view->n_children; i++) {
+    ArrowDeviceArrayViewCollectUnknownBufferSizes(array_view->children[i],
+                                                  offset_value_dst);
+  }
+
+  // ...and for dictionary
+  if (array_view->dictionary != NULL) {
+    ArrowDeviceArrayViewCollectUnknownBufferSizes(array_view->dictionary,
+                                                  offset_value_dst);
+  }
+}
+
+static ArrowErrorCode ArrowDeviceArrayViewEnsureBufferSizesAsync(
+    struct ArrowDeviceArrayView* device_array_view, void* stream,
     struct ArrowError* error) {
-  // Resolve device
-  struct ArrowDevice* device =
-      ArrowDeviceResolve(device_array->device_type, device_array->device_id);
-  if (device == NULL) {
-    ArrowErrorSet(error,
-                  "Can't resolve device with type %" PRId32 " and identifier 
%" PRId64,
-                  device_array->device_type, device_array->device_id);
-    return EINVAL;
+  // Walk the tree of arrays to check for buffers whose size we don't know
+  int64_t temp_buffer_length_bytes_required = 0;
+  NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewWalkUnknownBufferSizes(
+      &device_array_view->array_view, &temp_buffer_length_bytes_required));
+
+  // If there are no such arrays (e.g., there are no string or binary arrays 
in the tree),
+  // we don't have to do anything extra
+  if (temp_buffer_length_bytes_required == 0) {
+    return NANOARROW_OK;
   }
 
-  // Set the device array device
-  device_array_view->device = device;
+  // Ensure that the stream provided waits on the array's sync event
+  NANOARROW_RETURN_NOT_OK(device_array_view->device->synchronize_event(
+      device_array_view->device, device_array_view->sync_event, stream, 
error));
 
-  // Populate the array_view
-  
NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArrayMinimal(&device_array_view->array_view,
-                                                        &device_array->array, 
error));
+  // Allocate a buffer big enough to hold all the offset values we need to
+  // copy from the GPU
+  struct ArrowBuffer buffer;
+  ArrowBufferInit(&buffer);
+  NANOARROW_RETURN_NOT_OK(
+      ArrowBufferResize(&buffer, temp_buffer_length_bytes_required, 0));
+
+  uint8_t* cursor = buffer.data;
+  int result = ArrowDeviceArrayViewResolveUnknownBufferSizesAsync(
+      device_array_view->device, &device_array_view->array_view, &cursor, 
stream);
+  if (result != NANOARROW_OK) {
+    ArrowBufferReset(&buffer);
+    return result;
+  }
+
+  NANOARROW_DCHECK(cursor == (buffer.data + buffer.size_bytes));
+
+  // Synchronize the stream with the CPU
+  result = 
device_array_view->device->synchronize_event(device_array_view->device, NULL,
+                                                        stream, error);
+
+  // Collect the values from the temporary buffer
+  cursor = buffer.data;
+  
ArrowDeviceArrayViewCollectUnknownBufferSizes(&device_array_view->array_view, 
&cursor);
+  NANOARROW_DCHECK(cursor == (buffer.data + buffer.size_bytes));
+  ArrowBufferReset(&buffer);
 
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowDeviceArrayViewSetArray(
+ArrowErrorCode ArrowDeviceArrayViewSetArrayAsync(
     struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* 
device_array,
-    struct ArrowError* error) {
+    void* stream, struct ArrowError* error) {
+  // Populate the array view with all information accessible from the CPU
   NANOARROW_RETURN_NOT_OK(
       ArrowDeviceArrayViewSetArrayMinimal(device_array_view, device_array, 
error));
 
-  // Wait on device_array to synchronize with the CPU
-  // TODO: This is not actually sufficient for CUDA, where the synchronization
-  // should happen after the cudaMemcpy, not before it. The ordering of
-  // these operations should be explicit and asynchronous (and is probably 
outside
-  // the scope of what can be done with a generic callback).
-  NANOARROW_RETURN_NOT_OK(device_array_view->device->synchronize_event(
-      device_array_view->device, device_array->sync_event, error));
-
-  // Resolve unknown buffer sizes (i.e., string, binary, large string, large 
binary)
-  NANOARROW_RETURN_NOT_OK_WITH_ERROR(
-      ArrowDeviceArrayViewResolveBufferSizes(device_array_view->device,
-                                             &device_array_view->array_view),
-      error);
+  NANOARROW_RETURN_NOT_OK(
+      ArrowDeviceArrayViewEnsureBufferSizesAsync(device_array_view, stream, 
error));
 
   return NANOARROW_OK;
 }
@@ -395,7 +510,8 @@ ArrowErrorCode ArrowDeviceArrayViewSetArray(
 static ArrowErrorCode ArrowDeviceArrayViewCopyInternal(struct ArrowDevice* 
device_src,
                                                        struct ArrowArrayView* 
src,
                                                        struct ArrowDevice* 
device_dst,
-                                                       struct ArrowArray* dst) 
{
+                                                       struct ArrowArray* dst,
+                                                       void* stream) {
   // Currently no attempt to minimize the amount of memory copied (i.e.,
   // by applying offset + length and copying potentially fewer bytes)
   dst->length = src->length;
@@ -407,43 +523,58 @@ static ArrowErrorCode 
ArrowDeviceArrayViewCopyInternal(struct ArrowDevice* devic
       break;
     }
 
-    NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferInit(device_src, 
src->buffer_views[i],
-                                                  device_dst, 
ArrowArrayBuffer(dst, i)));
+    NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferInitAsync(
+        device_src, src->buffer_views[i], device_dst, ArrowArrayBuffer(dst, 
i), stream));
   }
 
   for (int64_t i = 0; i < src->n_children; i++) {
     NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewCopyInternal(
-        device_src, src->children[i], device_dst, dst->children[i]));
+        device_src, src->children[i], device_dst, dst->children[i], stream));
   }
 
   if (src->dictionary != NULL) {
     NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewCopyInternal(
-        device_src, src->dictionary, device_dst, dst->dictionary));
+        device_src, src->dictionary, device_dst, dst->dictionary, stream));
   }
 
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src,
-                                        struct ArrowDevice* device_dst,
-                                        struct ArrowDeviceArray* dst) {
+ArrowErrorCode ArrowDeviceArrayViewCopyAsync(struct ArrowDeviceArrayView* src,
+                                             struct ArrowDevice* device_dst,
+                                             struct ArrowDeviceArray* dst, 
void* stream) {
+  // Ensure src has all buffer sizes defined
+  NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewEnsureBufferSizesAsync(src, 
stream, NULL));
+
   struct ArrowArray tmp;
   NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromArrayView(&tmp, &src->array_view, 
NULL));
 
-  int result =
-      ArrowDeviceArrayViewCopyInternal(src->device, &src->array_view, 
device_dst, &tmp);
+  int result = ArrowDeviceArrayViewCopyInternal(src->device, &src->array_view, 
device_dst,
+                                                &tmp, stream);
   if (result != NANOARROW_OK) {
     ArrowArrayRelease(&tmp);
     return result;
   }
 
+  // If we are copying to the CPU, we need to synchronize the stream because we
+  // can't populate a sync event for a CPU array.
+  if (device_dst->device_type == ARROW_DEVICE_CPU) {
+    result = src->device->synchronize_event(src->device, NULL, stream, NULL);
+    if (result != NANOARROW_OK) {
+      ArrowArrayRelease(&tmp);
+      return result;
+    }
+
+    stream = NULL;
+  }
+
   result = ArrowArrayFinishBuilding(&tmp, NANOARROW_VALIDATION_LEVEL_MINIMAL, 
NULL);
   if (result != NANOARROW_OK) {
     ArrowArrayRelease(&tmp);
     return result;
   }
 
-  result = ArrowDeviceArrayInit(device_dst, dst, &tmp, NULL);
+  result = ArrowDeviceArrayInitAsync(device_dst, dst, &tmp, NULL, stream);
   if (result != NANOARROW_OK) {
     ArrowArrayRelease(&tmp);
     return result;
diff --git a/src/nanoarrow/device/device_test.cc 
b/src/nanoarrow/device/device_test.cc
index 6765baed..a303bf34 100644
--- a/src/nanoarrow/device/device_test.cc
+++ b/src/nanoarrow/device/device_test.cc
@@ -32,9 +32,13 @@ TEST(NanoarrowDevice, CpuDevice) {
   EXPECT_EQ(cpu, ArrowDeviceCpu());
 
   void* sync_event = nullptr;
-  EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, nullptr), NANOARROW_OK);
+  void* stream = nullptr;
+  EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, stream, nullptr), 
NANOARROW_OK);
   sync_event = cpu;
-  EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, nullptr), EINVAL);
+  EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, stream, nullptr), EINVAL);
+  sync_event = nullptr;
+  stream = cpu;
+  EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, stream, nullptr), EINVAL);
 }
 
 TEST(NanoarrowDevice, ArrowDeviceCpuBuffer) {
diff --git a/src/nanoarrow/device/metal.cc b/src/nanoarrow/device/metal.cc
index 17083a9e..78530ac1 100644
--- a/src/nanoarrow/device/metal.cc
+++ b/src/nanoarrow/device/metal.cc
@@ -150,20 +150,23 @@ static void ArrowDeviceMetalArrayRelease(struct 
ArrowArray* array) {
   }
   ArrowArrayRelease(&private_data->parent);
   ArrowFree(private_data);
-  array->release = NULL;
+  array->release = nullptr;
 }
 
-static ArrowErrorCode ArrowDeviceMetalArrayInit(struct ArrowDevice* device,
-                                                struct ArrowDeviceArray* 
device_array,
-                                                struct ArrowArray* array,
-                                                void* sync_event) {
+static ArrowErrorCode ArrowDeviceMetalArrayInitAsync(
+    struct ArrowDevice* device, struct ArrowDeviceArray* device_array,
+    struct ArrowArray* array, void* sync_event, void* stream) {
   struct ArrowDeviceMetalArrayPrivate* private_data =
       (struct ArrowDeviceMetalArrayPrivate*)ArrowMalloc(
           sizeof(struct ArrowDeviceMetalArrayPrivate));
-  if (private_data == NULL) {
+  if (private_data == nullptr) {
     return ENOMEM;
   }
 
+  if (stream != NULL) {
+    return EINVAL;
+  }
+
   // One can create a new event with mtl_device->newSharedEvent();
   private_data->event = static_cast<MTL::SharedEvent*>(sync_event);
 
@@ -180,10 +183,11 @@ static ArrowErrorCode ArrowDeviceMetalArrayInit(struct 
ArrowDevice* device,
   return NANOARROW_OK;
 }
 
-static ArrowErrorCode ArrowDeviceMetalBufferInit(struct ArrowDevice* 
device_src,
-                                                 struct ArrowBufferView src,
-                                                 struct ArrowDevice* 
device_dst,
-                                                 struct ArrowBuffer* dst) {
+static ArrowErrorCode ArrowDeviceMetalBufferInitAsync(struct ArrowDevice* 
device_src,
+                                                      struct ArrowBufferView 
src,
+                                                      struct ArrowDevice* 
device_dst,
+                                                      struct ArrowBuffer* dst,
+                                                      void* stream) {
   if (device_src->device_type == ARROW_DEVICE_CPU &&
       device_dst->device_type == ARROW_DEVICE_METAL) {
     struct ArrowBuffer tmp;
@@ -246,10 +250,11 @@ static ArrowErrorCode ArrowDeviceMetalBufferMove(struct 
ArrowDevice* device_src,
   }
 }
 
-static ArrowErrorCode ArrowDeviceMetalBufferCopy(struct ArrowDevice* 
device_src,
-                                                 struct ArrowBufferView src,
-                                                 struct ArrowDevice* 
device_dst,
-                                                 struct ArrowBufferView dst) {
+static ArrowErrorCode ArrowDeviceMetalBufferCopyAsync(struct ArrowDevice* 
device_src,
+                                                      struct ArrowBufferView 
src,
+                                                      struct ArrowDevice* 
device_dst,
+                                                      struct ArrowBufferView 
dst,
+                                                      void* stream) {
   // This is all just memcpy since it's all living in the same address space
   if (device_src->device_type == ARROW_DEVICE_CPU &&
       device_dst->device_type == ARROW_DEVICE_METAL) {
@@ -292,7 +297,7 @@ static int 
ArrowDeviceMetalCopyRequiredCpuToMetal(MTL::Device* mtl_device,
 }
 
 static ArrowErrorCode ArrowDeviceMetalSynchronize(struct ArrowDevice* device,
-                                                  void* sync_event,
+                                                  void* sync_event, void* 
stream,
                                                   struct ArrowError* error) {
   // TODO: sync events for Metal are harder than for CUDA
   // 
https://developer.apple.com/documentation/metal/resource_synchronization/synchronizing_events_between_a_gpu_and_the_cpu?language=objc
@@ -310,6 +315,11 @@ static ArrowErrorCode ArrowDeviceMetalSynchronize(struct 
ArrowDevice* device,
 
   // listener->release();
 
+  // The case where we actually have to do something is not implemented
+  if (sync_event != NULL || stream != NULL) {
+    return ENOTSUP;
+  }
+
   return NANOARROW_OK;
 }
 
@@ -334,7 +344,7 @@ static ArrowErrorCode ArrowDeviceMetalArrayMove(struct 
ArrowDevice* device_src,
   } else if (device_src->device_type == ARROW_DEVICE_METAL &&
              device_dst->device_type == ARROW_DEVICE_CPU) {
     NANOARROW_RETURN_NOT_OK(
-        ArrowDeviceMetalSynchronize(device_src, src->sync_event, nullptr));
+        ArrowDeviceMetalSynchronize(device_src, src->sync_event, nullptr, 
nullptr));
     ArrowDeviceArrayMove(src, dst);
     dst->device_type = device_dst->device_type;
     dst->device_id = device_dst->device_id;
@@ -376,11 +386,11 @@ ArrowErrorCode ArrowDeviceMetalInitDefaultDevice(struct 
ArrowDevice* device,
 
   device->device_type = ARROW_DEVICE_METAL;
   device->device_id = static_cast<int64_t>(default_device->registryID());
-  device->array_init = &ArrowDeviceMetalArrayInit;
+  device->array_init = &ArrowDeviceMetalArrayInitAsync;
   device->array_move = &ArrowDeviceMetalArrayMove;
-  device->buffer_init = &ArrowDeviceMetalBufferInit;
+  device->buffer_init = &ArrowDeviceMetalBufferInitAsync;
   device->buffer_move = &ArrowDeviceMetalBufferMove;
-  device->buffer_copy = &ArrowDeviceMetalBufferCopy;
+  device->buffer_copy = &ArrowDeviceMetalBufferCopyAsync;
   device->synchronize_event = &ArrowDeviceMetalSynchronize;
   device->release = &ArrowDeviceMetalRelease;
   device->private_data = default_device;
diff --git a/src/nanoarrow/nanoarrow_device.h b/src/nanoarrow/nanoarrow_device.h
index b284a9aa..ef5d3cca 100644
--- a/src/nanoarrow/nanoarrow_device.h
+++ b/src/nanoarrow/nanoarrow_device.h
@@ -121,27 +121,28 @@ static inline void ArrowDeviceArrayMove(struct 
ArrowDeviceArray* src,
 
 #define ArrowDeviceCheckRuntime \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceCheckRuntime)
-#define ArrowDeviceArrayInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowDeviceArrayInit)
+#define ArrowDeviceArrayInitAsync \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayInitAsync)
 #define ArrowDeviceArrayViewInit \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewInit)
 #define ArrowDeviceArrayViewReset \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewReset)
 #define ArrowDeviceArrayViewSetArrayMinimal \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewSetArrayMinimal)
-#define ArrowDeviceArrayViewSetArray \
-  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewSetArray)
-#define ArrowDeviceArrayViewCopy \
-  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewCopy)
-#define ArrowDeviceArrayViewCopyRequired \
-  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewCopyRequired)
+#define ArrowDeviceArrayViewSetArrayAsync \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewSetArrayAsync)
+#define ArrowDeviceArrayViewCopyAsync \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewCopyAsync)
 #define ArrowDeviceArrayMoveToDevice \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayMoveToDevice)
 #define ArrowDeviceResolve NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowDeviceResolve)
 #define ArrowDeviceCpu NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceCpu)
 #define ArrowDeviceInitCpu NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowDeviceInitCpu)
-#define ArrowDeviceBufferInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowDeviceBufferInit)
+#define ArrowDeviceBufferInitAsync \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBufferInitAsync)
 #define ArrowDeviceBufferMove NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowDeviceBufferMove)
-#define ArrowDeviceBufferCopy NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowDeviceBufferCopy)
+#define ArrowDeviceBufferCopyAsync \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBufferCopyAsync)
 #define ArrowDeviceBasicArrayStreamInit \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBasicArrayStreamInit)
 
@@ -168,6 +169,12 @@ static inline void ArrowDeviceArrayMove(struct 
ArrowDeviceArray* src,
 /// \brief Checks the nanoarrow runtime to make sure the run/build versions 
match
 ArrowErrorCode ArrowDeviceCheckRuntime(struct ArrowError* error);
 
+struct ArrowDeviceArrayView {
+  struct ArrowDevice* device;
+  struct ArrowArrayView array_view;
+  void* sync_event;
+};
+
 /// \brief A Device wrapper with callbacks for basic memory management tasks
 ///
 /// All device objects are currently implemented as singletons; however, this
@@ -182,19 +189,22 @@ struct ArrowDevice {
   /// \brief Initialize an ArrowDeviceArray from a previously allocated 
ArrowArray
   ///
   /// Given a device and an uninitialized device_array, populate the fields of 
the
-  /// device_array (including sync_event) appropriately. If NANOARROW_OK is 
returned,
-  /// ownership of array is transferred to device_array. This function must 
allocate
-  /// the appropriate sync_event and make its address available as
-  /// device_array->sync_event (if sync_event applies to this device type).
+  /// device_array appropriately. If sync_event is non-null, ownership is 
transferred
+  /// to the output array. If stream is non-null, the event must be recorded 
such that
+  /// it captures the work done on stream. If NANOARROW_OK is returned, 
ownership of array
+  /// and sync_event is transferred to device_array. The caller retains 
ownership of
+  /// stream.
   ArrowErrorCode (*array_init)(struct ArrowDevice* device,
                                struct ArrowDeviceArray* device_array,
-                               struct ArrowArray* array, void* sync_event);
+                               struct ArrowArray* array, void* sync_event, 
void* stream);
 
   /// \brief Move an ArrowDeviceArray between devices without copying buffers
   ///
   /// Some devices can move an ArrowDeviceArray without an explicit buffer 
copy,
   /// although the performance characteristics of the moved array may be 
different
-  /// than that of an explicitly copied one depending on the device.
+  /// than that of an explicitly copied one depending on the device. 
Implementations must
+  /// check device_src and device_dst and return ENOTSUP if not prepared to 
handle this
+  /// operation.
   ArrowErrorCode (*array_move)(struct ArrowDevice* device_src,
                                struct ArrowDeviceArray* src,
                                struct ArrowDevice* device_dst,
@@ -203,38 +213,45 @@ struct ArrowDevice {
   /// \brief Initialize an owning buffer from existing content
   ///
   /// Creates a new buffer whose data member can be accessed by the GPU by
-  /// copying existing content.
+  /// copying existing content. Implementations must use the provided stream
+  /// if non-null; implementations may error if they require a stream to be 
provided.
   /// Implementations must check device_src and device_dst and return ENOTSUP 
if
   /// not prepared to handle this operation.
   ArrowErrorCode (*buffer_init)(struct ArrowDevice* device_src,
                                 struct ArrowBufferView src,
-                                struct ArrowDevice* device_dst, struct 
ArrowBuffer* dst);
+                                struct ArrowDevice* device_dst, struct 
ArrowBuffer* dst,
+                                void* stream);
 
   /// \brief Move an owning buffer to a device
   ///
   /// Creates a new buffer whose data member can be accessed by the GPU by
   /// moving an existing buffer. If NANOARROW_OK is returned, src will have
   /// been released or moved by the implementation and dst must be released by
-  /// the caller.
-  /// Implementations must check device_src and device_dst and return ENOTSUP 
if
-  /// not prepared to handle this operation.
+  /// the caller. Implementations must check device_src and device_dst and 
return ENOTSUP
+  /// if not prepared to handle this operation.
   ArrowErrorCode (*buffer_move)(struct ArrowDevice* device_src, struct 
ArrowBuffer* src,
                                 struct ArrowDevice* device_dst, struct 
ArrowBuffer* dst);
 
   /// \brief Copy a section of memory into a preallocated buffer
   ///
   /// As opposed to the other buffer operations, this is designed to support
-  /// copying very small slices of memory.
+  /// copying very small slices of memory. Implementations must use the 
provided stream
+  /// if non-null; implementations may error if they require a stream to be 
provided.
   /// Implementations must check device_src and device_dst and return ENOTSUP 
if
   /// not prepared to handle this operation.
   ArrowErrorCode (*buffer_copy)(struct ArrowDevice* device_src,
                                 struct ArrowBufferView src,
                                 struct ArrowDevice* device_dst,
-                                struct ArrowBufferView dst);
+                                struct ArrowBufferView dst, void* stream);
 
-  /// \brief Wait for an event on the CPU host
+  /// \brief Synchronize an event and/or stream
+  ///
+  /// If both sync_event and stream are non-null, ensures that the stream waits
+  /// on the event. If only sync_event is non-null, ensures that the work 
captured
+  /// by the event is synchronized with the CPU. If only stream is non-null, 
ensures
+  /// that stream is synchronized with the CPU.
   ArrowErrorCode (*synchronize_event)(struct ArrowDevice* device, void* 
sync_event,
-                                      struct ArrowError* error);
+                                      void* stream, struct ArrowError* error);
 
   /// \brief Release this device and any resources it holds
   void (*release)(struct ArrowDevice* device);
@@ -243,18 +260,50 @@ struct ArrowDevice {
   void* private_data;
 };
 
-struct ArrowDeviceArrayView {
-  struct ArrowDevice* device;
-  struct ArrowArrayView array_view;
-};
+/// \brief Pointer to a statically-allocated CPU device singleton
+struct ArrowDevice* ArrowDeviceCpu(void);
+
+/// \brief Initialize a user-allocated device struct with a CPU device
+void ArrowDeviceInitCpu(struct ArrowDevice* device);
+
+/// \brief Resolve a device pointer from a type + identifier
+///
+/// Depending on which libraries this build of the device extension was built 
with,
+/// some device types may or may not be supported. The CPU type is always 
supported.
+/// Returns NULL for device that does not exist or cannot be returned as a 
singleton.
+/// Callers must not release the pointed-to device.
+struct ArrowDevice* ArrowDeviceResolve(ArrowDeviceType device_type, int64_t 
device_id);
 
 /// \brief Initialize an ArrowDeviceArray
 ///
 /// Given an ArrowArray whose buffers/release callback has been set 
appropriately,
-/// initialize an ArrowDeviceArray.
-ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device,
-                                    struct ArrowDeviceArray* device_array,
-                                    struct ArrowArray* array, void* 
sync_event);
+/// initialize an ArrowDeviceArray. If sync_event is non-null, ownership is 
transferred
+/// to the output array. If stream is non-null, the event must be recorded 
such that
+/// it captures the work done on stream. If NANOARROW_OK is returned, 
ownership of array
+/// and sync_event is transferred to device_array. The caller retains 
ownership of
+/// stream.
+ArrowErrorCode ArrowDeviceArrayInitAsync(struct ArrowDevice* device,
+                                         struct ArrowDeviceArray* device_array,
+                                         struct ArrowArray* array, void* 
sync_event,
+                                         void* stream);
+
+/// \brief Initialize an ArrowDeviceArray without a stream
+///
+/// Convenience wrapper to initialize an ArrowDeviceArray without a stream.
+static inline ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device,
+                                                  struct ArrowDeviceArray* 
device_array,
+                                                  struct ArrowArray* array,
+                                                  void* sync_event);
+
+/// \brief Initialize an ArrowDeviceArrayStream from an existing 
ArrowArrayStream
+///
+/// Wrap an ArrowArrayStream of ArrowDeviceArray objects already allocated by 
the
+/// specified device as an ArrowDeviceArrayStream. This function moves the 
ownership
+/// of array_stream to the device_array_stream. If this function returns 
NANOARROW_OK,
+/// the caller is responsible for releasing the ArrowDeviceArrayStream.
+ArrowErrorCode ArrowDeviceBasicArrayStreamInit(
+    struct ArrowDeviceArrayStream* device_array_stream,
+    struct ArrowArrayStream* array_stream, struct ArrowDevice* device);
 
 /// \brief Initialize an ArrowDeviceArrayView
 ///
@@ -277,71 +326,94 @@ ArrowErrorCode ArrowDeviceArrayViewSetArrayMinimal(
 /// \brief Set ArrowArrayView buffer information from a device array
 ///
 /// Runs ArrowDeviceArrayViewSetArrayMinimal() but also sets buffer sizes for
-/// variable-length buffers by copying data from the device. This function 
will block on
-/// the device_array's sync_event.
-ArrowErrorCode ArrowDeviceArrayViewSetArray(
+/// variable-length buffers by copying data from the device if needed. If 
stream
+/// is provided it will be used to do any copying required to resolve buffer 
sizes.
+ArrowErrorCode ArrowDeviceArrayViewSetArrayAsync(
+    struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* 
device_array,
+    void* stream, struct ArrowError* error);
+
+/// \brief Set ArrowArrayView buffer information from a device array without a 
stream
+///
+/// Convenience wrapper for the case where no stream is provided.
+static inline ArrowErrorCode ArrowDeviceArrayViewSetArray(
     struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* 
device_array,
     struct ArrowError* error);
 
 /// \brief Copy an ArrowDeviceArrayView to a device
-ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src,
-                                        struct ArrowDevice* device_dst,
-                                        struct ArrowDeviceArray* dst);
+///
+/// If stream is provided, it will be used to launch copies asynchronously.
+/// Note that this implies that all pointers in src will remain valid until
+/// the stream is synchronized.
+ArrowErrorCode ArrowDeviceArrayViewCopyAsync(struct ArrowDeviceArrayView* src,
+                                             struct ArrowDevice* device_dst,
+                                             struct ArrowDeviceArray* dst, 
void* stream);
+
+/// \brief Copy an ArrowDeviceArrayView to a device without a stream
+///
+/// Convenience wrapper for the case where no stream is provided.
+static inline ArrowErrorCode ArrowDeviceArrayViewCopy(struct 
ArrowDeviceArrayView* src,
+                                                      struct ArrowDevice* 
device_dst,
+                                                      struct ArrowDeviceArray* 
dst);
 
 /// \brief Move an ArrowDeviceArray to a device if possible
 ///
 /// Will attempt to move a device array to a device without copying buffers.
 /// This may result in a device array with different performance charateristics
-/// than an array that was copied.
+/// than an array that was copied. Returns ENOTSUP if a zero-copy move between 
devices is
+/// not possible.
 ArrowErrorCode ArrowDeviceArrayMoveToDevice(struct ArrowDeviceArray* src,
                                             struct ArrowDevice* device_dst,
                                             struct ArrowDeviceArray* dst);
 
-/// \brief Pointer to a statically-allocated CPU device singleton
-struct ArrowDevice* ArrowDeviceCpu(void);
-
-/// \brief Initialize a user-allocated device struct with a CPU device
-void ArrowDeviceInitCpu(struct ArrowDevice* device);
-
-/// \brief Resolve a device pointer from a type + identifier
+/// \brief Allocate a device buffer and copying existing content
 ///
-/// Depending on which libraries this build of the device extension was built 
with,
-/// some device types may or may not be supported. The CPU type is always 
supported.
-/// Returns NULL for device that does not exist or cannot be returned as a 
singleton.
-/// Callers must not release the pointed-to device.
-struct ArrowDevice* ArrowDeviceResolve(ArrowDeviceType device_type, int64_t 
device_id);
-
-ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* device_src,
-                                     struct ArrowBufferView src,
-                                     struct ArrowDevice* device_dst,
-                                     struct ArrowBuffer* dst);
+/// If stream is provided, it will be used to launch copies asynchronously.
+/// Note that this implies that src will remain valid until the stream is
+/// synchronized.
+ArrowErrorCode ArrowDeviceBufferInitAsync(struct ArrowDevice* device_src,
+                                          struct ArrowBufferView src,
+                                          struct ArrowDevice* device_dst,
+                                          struct ArrowBuffer* dst, void* 
stream);
+
+/// \brief Allocate a device buffer and copying existing content without a 
stream
+///
+/// Convenience wrapper for the case where no stream is provided.
+static inline ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* 
device_src,
+                                                   struct ArrowBufferView src,
+                                                   struct ArrowDevice* 
device_dst,
+                                                   struct ArrowBuffer* dst);
 
+/// \brief Move a buffer to a device without copying if possible
+///
+/// Returns ENOTSUP if a zero-copy move between devices is not possible.
 ArrowErrorCode ArrowDeviceBufferMove(struct ArrowDevice* device_src,
                                      struct ArrowBuffer* src,
                                      struct ArrowDevice* device_dst,
                                      struct ArrowBuffer* dst);
 
-ArrowErrorCode ArrowDeviceBufferCopy(struct ArrowDevice* device_src,
-                                     struct ArrowBufferView src,
-                                     struct ArrowDevice* device_dst,
-                                     struct ArrowBufferView dst);
-
-/// \brief Initialize an ArrowDeviceArrayStream from an existing 
ArrowArrayStream
+/// \brief Copy a buffer into preallocated device memory
 ///
-/// Wrap an ArrowArrayStream of ArrowDeviceArray objects already allocated by 
the
-/// specified device as an ArrowDeviceArrayStream. This function moves the 
ownership of
-/// array_stream to the device_array_stream. If this function returns 
NANOARROW_OK, the
-/// caller is responsible for releasing the ArrowDeviceArrayStream.
-ArrowErrorCode ArrowDeviceBasicArrayStreamInit(
-    struct ArrowDeviceArrayStream* device_array_stream,
-    struct ArrowArrayStream* array_stream, struct ArrowDevice* device);
+/// If stream is provided, it will be used to launch copies asynchronously.
+/// Note that this implies that src will remain valid until the stream is
+/// synchronized.
+ArrowErrorCode ArrowDeviceBufferCopyAsync(struct ArrowDevice* device_src,
+                                          struct ArrowBufferView src,
+                                          struct ArrowDevice* device_dst,
+                                          struct ArrowBufferView dst, void* 
stream);
+
+/// \brief Copy a buffer into preallocated devie memory
+///
+/// Returns ENOTSUP if a zero-copy move between devices is not possible.
+static inline ArrowErrorCode ArrowDeviceBufferCopy(struct ArrowDevice* 
device_src,
+                                                   struct ArrowBufferView src,
+                                                   struct ArrowDevice* 
device_dst,
+                                                   struct ArrowBufferView dst);
 
 /// @}
 
 /// \defgroup nanoarrow_device_cuda CUDA Device extension
 ///
-/// A CUDA (i.e., `cuda_runtime_api.h`) implementation of the Arrow C Device
-/// interface.
+/// A CUDA (i.e., `cuda.h`) implementation of the Arrow C Device interface.
 ///
 /// @{
 
@@ -394,6 +466,41 @@ ArrowErrorCode ArrowDeviceMetalAlignArrayBuffers(struct 
ArrowArray* array);
 
 /// @}
 
+// Inline implementations
+
+static inline ArrowErrorCode ArrowDeviceBufferCopy(struct ArrowDevice* 
device_src,
+                                                   struct ArrowBufferView src,
+                                                   struct ArrowDevice* 
device_dst,
+                                                   struct ArrowBufferView dst) 
{
+  return ArrowDeviceBufferCopyAsync(device_src, src, device_dst, dst, NULL);
+}
+
+static inline ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* 
device_src,
+                                                   struct ArrowBufferView src,
+                                                   struct ArrowDevice* 
device_dst,
+                                                   struct ArrowBuffer* dst) {
+  return ArrowDeviceBufferInitAsync(device_src, src, device_dst, dst, NULL);
+}
+
+static inline ArrowErrorCode ArrowDeviceArrayViewCopy(struct 
ArrowDeviceArrayView* src,
+                                                      struct ArrowDevice* 
device_dst,
+                                                      struct ArrowDeviceArray* 
dst) {
+  return ArrowDeviceArrayViewCopyAsync(src, device_dst, dst, NULL);
+}
+
+static inline ArrowErrorCode ArrowDeviceArrayViewSetArray(
+    struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* 
device_array,
+    struct ArrowError* error) {
+  return ArrowDeviceArrayViewSetArrayAsync(device_array_view, device_array, 
NULL, error);
+}
+
+static inline ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device,
+                                                  struct ArrowDeviceArray* 
device_array,
+                                                  struct ArrowArray* array,
+                                                  void* sync_event) {
+  return ArrowDeviceArrayInitAsync(device, device_array, array, sync_event, 
NULL);
+}
+
 #ifdef __cplusplus
 }
 #endif

Reply via email to