This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 969f4b439c GH-36488: [C++] Import/Export ArrowDeviceArray (#36489)
969f4b439c is described below

commit 969f4b439c8dd9052372fe6c652dbd2459678042
Author: Matt Topol <[email protected]>
AuthorDate: Tue Jul 25 10:33:56 2023 -0400

    GH-36488: [C++] Import/Export ArrowDeviceArray (#36489)
    
    
    
    ### Rationale for this change
    With the addition of the `ArrowDeviceArray` we should provide import/export 
functions just like we have for `ArrowArray`.
    
    ### What changes are included in this PR?
    Adding Import/Export functions to the `bridge.h/.cc` files for C Data 
Non-CPU data.
    
    This requires adding a device type to buffers and memory managers to 
propagate through.
    
    ### Are these changes tested?
    Yes.
    
    (Still need to add tests to `bridge_test.cc` but I wanted to get eyes on 
this first)
    
    ### Are there any user-facing changes?
    No.
    
    * Closes: #36488
    
    Lead-authored-by: Matt Topol <[email protected]>
    Co-authored-by: Weston Pace <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 cpp/src/arrow/buffer.h            |  22 ++-
 cpp/src/arrow/buffer_test.cc      |  14 ++
 cpp/src/arrow/c/bridge.cc         | 192 +++++++++++++++++-
 cpp/src/arrow/c/bridge.h          | 130 ++++++++++++
 cpp/src/arrow/c/bridge_test.cc    | 404 ++++++++++++++++++++++++++++++++++++++
 cpp/src/arrow/device.h            |  28 +++
 cpp/src/arrow/gpu/cuda_context.cc |   3 +-
 cpp/src/arrow/gpu/cuda_context.h  |   4 +
 cpp/src/arrow/gpu/cuda_memory.cc  |  21 ++
 cpp/src/arrow/gpu/cuda_memory.h   |   8 +-
 cpp/src/arrow/gpu/cuda_test.cc    |   4 +
 11 files changed, 823 insertions(+), 7 deletions(-)

diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index ad2496aeeb..08a3bd749e 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <cstring>
 #include <memory>
+#include <optional>
 #include <string>
 #include <string_view>
 #include <utility>
@@ -58,18 +59,31 @@ class ARROW_EXPORT Buffer {
   ///
   /// \note The passed memory must be kept alive through some other means
   Buffer(const uint8_t* data, int64_t size)
-      : is_mutable_(false), is_cpu_(true), data_(data), size_(size), 
capacity_(size) {
+      : is_mutable_(false),
+        is_cpu_(true),
+        data_(data),
+        size_(size),
+        capacity_(size),
+        device_type_(DeviceAllocationType::kCPU) {
     SetMemoryManager(default_cpu_memory_manager());
   }
 
   Buffer(const uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm,
-         std::shared_ptr<Buffer> parent = NULLPTR)
+         std::shared_ptr<Buffer> parent = NULLPTR,
+         std::optional<DeviceAllocationType> device_type = std::nullopt)
       : is_mutable_(false),
         data_(data),
         size_(size),
         capacity_(size),
         parent_(std::move(parent)) {
+    // SetMemoryManager will also set device_type_
     SetMemoryManager(std::move(mm));
+    // if a device type is specified, use that instead. for example:
+    // CUDA_HOST. The CudaMemoryManager will set device_type_ to CUDA,
+    // but you can specify CUDA_HOST as the device type to override it.
+    if (device_type != std::nullopt) {
+      device_type_ = device_type;
+    }
   }
 
   Buffer(uintptr_t address, int64_t size, std::shared_ptr<MemoryManager> mm,
@@ -282,6 +296,8 @@ class ARROW_EXPORT Buffer {
 
   const std::shared_ptr<MemoryManager>& memory_manager() const { return 
memory_manager_; }
 
+  std::optional<DeviceAllocationType> device_type() const { return 
device_type_; }
+
   std::shared_ptr<Buffer> parent() const { return parent_; }
 
   /// \brief Get a RandomAccessFile for reading a buffer
@@ -336,6 +352,7 @@ class ARROW_EXPORT Buffer {
   const uint8_t* data_;
   int64_t size_;
   int64_t capacity_;
+  std::optional<DeviceAllocationType> device_type_;
 
   // null by default, but may be set
   std::shared_ptr<Buffer> parent_;
@@ -353,6 +370,7 @@ class ARROW_EXPORT Buffer {
   void SetMemoryManager(std::shared_ptr<MemoryManager> mm) {
     memory_manager_ = std::move(mm);
     is_cpu_ = memory_manager_->is_cpu();
+    device_type_ = memory_manager_->device()->device_type();
   }
 };
 
diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc
index ce8bab846d..3dd95cb8af 100644
--- a/cpp/src/arrow/buffer_test.cc
+++ b/cpp/src/arrow/buffer_test.cc
@@ -41,6 +41,7 @@ using internal::checked_cast;
 using internal::checked_pointer_cast;
 
 static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
+static const DeviceAllocationType kMyDeviceType = 
DeviceAllocationType::kEXT_DEV;
 
 static const int kMyDeviceAllowCopy = 1;
 static const int kMyDeviceAllowView = 2;
@@ -70,6 +71,8 @@ class MyDevice : public Device {
     return checked_cast<const MyDevice&>(other).value_ == value_;
   }
 
+  DeviceAllocationType device_type() const override { return kMyDeviceType; }
+
   std::shared_ptr<MemoryManager> default_memory_manager() override;
 
   int value() const { return value_; }
@@ -256,6 +259,7 @@ TEST_F(TestDevice, Copy) {
   ASSERT_EQ(buffer->device(), cpu_device_);
   ASSERT_TRUE(buffer->is_cpu());
   ASSERT_NE(buffer->address(), cpu_src_->address());
+  ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
   ASSERT_NE(buffer->data(), nullptr);
   AssertBufferEqual(*buffer, "some data");
 
@@ -263,6 +267,7 @@ TEST_F(TestDevice, Copy) {
   ASSERT_EQ(buffer->device(), cpu_device_);
   ASSERT_TRUE(buffer->is_cpu());
   ASSERT_NE(buffer->address(), cpu_src_->address());
+  ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
   ASSERT_NE(buffer->data(), nullptr);
   AssertBufferEqual(*buffer, "some data");
 
@@ -271,6 +276,7 @@ TEST_F(TestDevice, Copy) {
   ASSERT_EQ(buffer->device(), my_copy_device_);
   ASSERT_FALSE(buffer->is_cpu());
   ASSERT_NE(buffer->address(), cpu_src_->address());
+  ASSERT_EQ(buffer->device_type(), kMyDeviceType);
 #ifdef NDEBUG
   ASSERT_EQ(buffer->data(), nullptr);
 #endif
@@ -280,6 +286,7 @@ TEST_F(TestDevice, Copy) {
   ASSERT_EQ(buffer->device(), my_copy_device_);
   ASSERT_FALSE(buffer->is_cpu());
   ASSERT_NE(buffer->address(), cpu_src_->address());
+  ASSERT_EQ(buffer->device_type(), kMyDeviceType);
 #ifdef NDEBUG
   ASSERT_EQ(buffer->data(), nullptr);
 #endif
@@ -290,6 +297,7 @@ TEST_F(TestDevice, Copy) {
   ASSERT_EQ(buffer->device(), cpu_device_);
   ASSERT_TRUE(buffer->is_cpu());
   ASSERT_NE(buffer->address(), my_copy_src_->address());
+  ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
   ASSERT_NE(buffer->data(), nullptr);
   AssertBufferEqual(*buffer, "some data");
 
@@ -297,6 +305,7 @@ TEST_F(TestDevice, Copy) {
   ASSERT_EQ(buffer->device(), cpu_device_);
   ASSERT_TRUE(buffer->is_cpu());
   ASSERT_NE(buffer->address(), my_copy_src_->address());
+  ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
   ASSERT_NE(buffer->data(), nullptr);
   AssertBufferEqual(*buffer, "some data");
 
@@ -305,6 +314,7 @@ TEST_F(TestDevice, Copy) {
   ASSERT_EQ(buffer->device(), my_copy_device_);
   ASSERT_FALSE(buffer->is_cpu());
   ASSERT_NE(buffer->address(), my_copy_src_->address());
+  ASSERT_EQ(buffer->device_type(), kMyDeviceType);
 #ifdef NDEBUG
   ASSERT_EQ(buffer->data(), nullptr);
 #endif
@@ -315,6 +325,7 @@ TEST_F(TestDevice, Copy) {
   ASSERT_EQ(buffer->device(), my_copy_device_);
   ASSERT_FALSE(buffer->is_cpu());
   ASSERT_NE(buffer->address(), my_copy_src_->address());
+  ASSERT_EQ(buffer->device_type(), kMyDeviceType);
 #ifdef NDEBUG
   ASSERT_EQ(buffer->data(), nullptr);
 #endif
@@ -330,6 +341,7 @@ TEST_F(TestDevice, View) {
   ASSERT_EQ(buffer->device(), cpu_device_);
   ASSERT_TRUE(buffer->is_cpu());
   ASSERT_EQ(buffer->address(), cpu_src_->address());
+  ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
   ASSERT_NE(buffer->data(), nullptr);
   AssertBufferEqual(*buffer, "some data");
 
@@ -338,6 +350,7 @@ TEST_F(TestDevice, View) {
   ASSERT_EQ(buffer->device(), my_view_device_);
   ASSERT_FALSE(buffer->is_cpu());
   ASSERT_EQ(buffer->address(), cpu_src_->address());
+  ASSERT_EQ(buffer->device_type(), kMyDeviceType);
 #ifdef NDEBUG
   ASSERT_EQ(buffer->data(), nullptr);
 #endif
@@ -348,6 +361,7 @@ TEST_F(TestDevice, View) {
   ASSERT_EQ(buffer->device(), cpu_device_);
   ASSERT_TRUE(buffer->is_cpu());
   ASSERT_EQ(buffer->address(), my_copy_src_->address());
+  ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
   ASSERT_NE(buffer->data(), nullptr);
   AssertBufferEqual(*buffer, "some data");
 
diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc
index 85a5156d11..13355dd6d0 100644
--- a/cpp/src/arrow/c/bridge.cc
+++ b/cpp/src/arrow/c/bridge.cc
@@ -522,6 +522,8 @@ struct ExportedArrayPrivateData : 
PoolAllocationMixin<ExportedArrayPrivateData>
 
   std::shared_ptr<ArrayData> data_;
 
+  RawSyncEvent sync_event_;
+
   ExportedArrayPrivateData() = default;
   ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData);
   ARROW_DISALLOW_COPY_AND_ASSIGN(ExportedArrayPrivateData);
@@ -544,7 +546,12 @@ void ReleaseExportedArray(struct ArrowArray* array) {
         << "Dictionary release callback should have marked it released";
   }
   DCHECK_NE(array->private_data, nullptr);
-  delete reinterpret_cast<ExportedArrayPrivateData*>(array->private_data);
+  auto* pdata = 
reinterpret_cast<ExportedArrayPrivateData*>(array->private_data);
+  if (pdata->sync_event_.sync_event != nullptr &&
+      pdata->sync_event_.release_func != nullptr) {
+    pdata->sync_event_.release_func(pdata->sync_event_.sync_event);
+  }
+  delete pdata;
 
   ArrowArrayMarkReleased(array);
 }
@@ -584,6 +591,7 @@ struct ArrayExporter {
     // Store owning pointer to ArrayData
     export_.data_ = data;
 
+    export_.sync_event_ = RawSyncEvent();
     return Status::OK();
   }
 
@@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct 
ArrowArray* out,
   return Status::OK();
 }
 
+//////////////////////////////////////////////////////////////////////////
+// C device arrays
+
+Status ValidateDeviceInfo(const ArrayData& data,
+                          std::optional<DeviceAllocationType>* device_type,
+                          int64_t* device_id) {
+  for (const auto& buf : data.buffers) {
+    if (!buf) {
+      continue;
+    }
+
+    if (*device_type == std::nullopt) {
+      *device_type = buf->device_type();
+      *device_id = buf->device()->device_id();
+      continue;
+    }
+
+    if (buf->device_type() != *device_type) {
+      return Status::Invalid(
+          "Exporting device array with buffers on more than one device.");
+    }
+
+    if (buf->device()->device_id() != *device_id) {
+      return Status::Invalid(
+          "Exporting device array with buffers on multiple device ids.");
+    }
+  }
+
+  for (const auto& child : data.child_data) {
+    RETURN_NOT_OK(ValidateDeviceInfo(*child, device_type, device_id));
+  }
+
+  return Status::OK();
+}
+
+Result<std::pair<std::optional<DeviceAllocationType>, int64_t>> 
ValidateDeviceInfo(
+    const ArrayData& data) {
+  std::optional<DeviceAllocationType> device_type;
+  int64_t device_id = -1;
+  RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id));
+  return std::make_pair(device_type, device_id);
+}
+
+Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
+                         struct ArrowDeviceArray* out, struct ArrowSchema* 
out_schema) {
+  if (sync_event.sync_event != nullptr && sync_event.release_func) {
+    return Status::Invalid(
+        "Must provide a release event function if providing a non-null event");
+  }
+
+  SchemaExportGuard guard(out_schema);
+  if (out_schema != nullptr) {
+    RETURN_NOT_OK(ExportType(*array.type(), out_schema));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array.data()));
+  if (!device_info.first) {
+    out->device_type = ARROW_DEVICE_CPU;
+  } else {
+    out->device_type = static_cast<ArrowDeviceType>(*device_info.first);
+  }
+  out->device_id = device_info.second;
+
+  ArrayExporter exporter;
+  RETURN_NOT_OK(exporter.Export(array.data()));
+  exporter.Finish(&out->array);
+
+  auto* pdata = 
reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
+  pdata->sync_event_ = sync_event;
+  out->sync_event = sync_event.sync_event;
+
+  guard.Detach();
+  return Status::OK();
+}
+
+Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent 
sync_event,
+                               struct ArrowDeviceArray* out,
+                               struct ArrowSchema* out_schema) {
+  if (sync_event.sync_event != nullptr && sync_event.release_func == nullptr) {
+    return Status::Invalid(
+        "Must provide a release event function if providing a non-null event");
+  }
+
+  // XXX perhaps bypass ToStructArray for speed?
+  ARROW_ASSIGN_OR_RAISE(auto array, batch.ToStructArray());
+
+  SchemaExportGuard guard(out_schema);
+  if (out_schema != nullptr) {
+    // Export the schema, not the struct type, so as not to lose top-level 
metadata
+    RETURN_NOT_OK(ExportSchema(*batch.schema(), out_schema));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array->data()));
+  if (!device_info.first) {
+    out->device_type = ARROW_DEVICE_CPU;
+  } else {
+    out->device_type = static_cast<ArrowDeviceType>(*device_info.first);
+  }
+  out->device_id = device_info.second;
+
+  ArrayExporter exporter;
+  RETURN_NOT_OK(exporter.Export(array->data()));
+  exporter.Finish(&out->array);
+
+  auto* pdata = 
reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
+  pdata->sync_event_ = sync_event;
+  out->sync_event = sync_event.sync_event;
+
+  guard.Detach();
+  return Status::OK();
+}
+
 //////////////////////////////////////////////////////////////////////////
 // C schema import
 
@@ -1242,6 +1362,7 @@ namespace {
 // The ArrowArray is released on destruction.
 struct ImportedArrayData {
   struct ArrowArray array_;
+  void* sync_event_;
 
   ImportedArrayData() {
     ArrowArrayMarkReleased(&array_);  // Initially released
@@ -1267,6 +1388,11 @@ class ImportedBuffer : public Buffer {
                  std::shared_ptr<ImportedArrayData> import)
       : Buffer(data, size), import_(std::move(import)) {}
 
+  ImportedBuffer(const uint8_t* data, int64_t size, 
std::shared_ptr<MemoryManager> mm,
+                 DeviceAllocationType device_type,
+                 std::shared_ptr<ImportedArrayData> import)
+      : Buffer(data, size, mm, nullptr, device_type), 
import_(std::move(import)) {}
+
   ~ImportedBuffer() override {}
 
  protected:
@@ -1275,7 +1401,20 @@ class ImportedBuffer : public Buffer {
 
 struct ArrayImporter {
   explicit ArrayImporter(const std::shared_ptr<DataType>& type)
-      : type_(type), zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 
0)) {}
+      : type_(type),
+        zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)),
+        device_type_(DeviceAllocationType::kCPU) {}
+
+  Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper& 
mapper) {
+    ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, 
src->device_id));
+    device_type_ = static_cast<DeviceAllocationType>(src->device_type);
+    RETURN_NOT_OK(Import(&src->array));
+    import_->sync_event_ = src->sync_event;
+    // reset internal state before next import
+    memory_mgr_.reset();
+    device_type_ = DeviceAllocationType::kCPU;
+    return Status::OK();
+  }
 
   Status Import(struct ArrowArray* src) {
     if (ArrowArrayIsReleased(src)) {
@@ -1588,7 +1727,12 @@ struct ArrayImporter {
     std::shared_ptr<Buffer>* out = &data_->buffers[buffer_id];
     auto data = reinterpret_cast<const 
uint8_t*>(c_struct_->buffers[buffer_id]);
     if (data != nullptr) {
-      *out = std::make_shared<ImportedBuffer>(data, buffer_size, import_);
+      if (memory_mgr_) {
+        *out = std::make_shared<ImportedBuffer>(data, buffer_size, memory_mgr_,
+                                                device_type_, import_);
+      } else {
+        *out = std::make_shared<ImportedBuffer>(data, buffer_size, import_);
+      }
     } else if (is_null_bitmap) {
       out->reset();
     } else {
@@ -1613,6 +1757,9 @@ struct ArrayImporter {
 
   // For imported null buffer pointers
   std::shared_ptr<Buffer> zero_size_buffer_;
+
+  std::shared_ptr<MemoryManager> memory_mgr_;
+  DeviceAllocationType device_type_;
 };
 
 }  // namespace
@@ -1652,6 +1799,45 @@ Result<std::shared_ptr<RecordBatch>> 
ImportRecordBatch(struct ArrowArray* array,
   return ImportRecordBatch(array, *maybe_schema);
 }
 
+Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* 
array,
+                                                 std::shared_ptr<DataType> 
type,
+                                                 const DeviceMemoryMapper& 
mapper) {
+  ArrayImporter importer(type);
+  RETURN_NOT_OK(importer.Import(array, mapper));
+  return importer.MakeArray();
+}
+
+Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* 
array,
+                                                 struct ArrowSchema* type,
+                                                 const DeviceMemoryMapper& 
mapper) {
+  auto maybe_type = ImportType(type);
+  if (!maybe_type.ok()) {
+    ArrowArrayRelease(&array->array);
+    return maybe_type.status();
+  }
+  return ImportDeviceArray(array, *maybe_type, mapper);
+}
+
+Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
+    struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema,
+    const DeviceMemoryMapper& mapper) {
+  auto type = struct_(schema->fields());
+  ArrayImporter importer(type);
+  RETURN_NOT_OK(importer.Import(array, mapper));
+  return importer.MakeRecordBatch(std::move(schema));
+}
+
+Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
+    struct ArrowDeviceArray* array, struct ArrowSchema* schema,
+    const DeviceMemoryMapper& mapper) {
+  auto maybe_schema = ImportSchema(schema);
+  if (!maybe_schema.ok()) {
+    ArrowArrayRelease(&array->array);
+    return maybe_schema.status();
+  }
+  return ImportDeviceRecordBatch(array, *maybe_schema, mapper);
+}
+
 //////////////////////////////////////////////////////////////////////////
 // C stream export
 
diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h
index 3b1a013d20..92707a5972 100644
--- a/cpp/src/arrow/c/bridge.h
+++ b/cpp/src/arrow/c/bridge.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <functional>
 #include <memory>
 #include <string>
 
@@ -166,6 +167,135 @@ Result<std::shared_ptr<RecordBatch>> 
ImportRecordBatch(struct ArrowArray* array,
 
 /// @}
 
+/// \defgroup c-data-device-interface Functions for working with the C data 
device
+/// interface.
+///
+/// @{
+
+/// \brief EXPERIMENTAL: Type for freeing a sync event
+///
+/// If synchronization is necessary for accessing the data on a device,
+/// a pointer to an event needs to be passed when exporting the device
+/// array. It's the responsibility of the release function for the array
+/// to release the event. Both can be null if no sync'ing is necessary.
+struct RawSyncEvent {
+  void* sync_event = NULL;
+  std::function<void(void*)> release_func;
+};
+
+/// \brief EXPERIMENTAL: Export C++ Array as an ArrowDeviceArray.
+///
+/// The resulting ArrowDeviceArray struct keeps the array data and buffers 
alive
+/// until its release callback is called by the consumer. All buffers in
+/// the provided array MUST have the same device_type, otherwise an error
+/// will be returned.
+///
+/// If a non-null sync_event is provided, then the sync_release func must also 
be
+/// non-null. If the sync_event is null, then the sync_release parameter is 
not called.
+///
+/// \param[in] array Array object to export
+/// \param[in] sync_event A struct containing what is needed for syncing if 
necessary
+/// \param[out] out C struct to export the array to
+/// \param[out] out_schema optional C struct to export the array type to
+ARROW_EXPORT
+Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
+                         struct ArrowDeviceArray* out,
+                         struct ArrowSchema* out_schema = NULLPTR);
+
+/// \brief EXPERIMENTAL: Export C++ RecordBatch as an ArrowDeviceArray.
+///
+/// The record batch is exported as if it were a struct array.
+/// The resulting ArrowDeviceArray struct keeps the record batch data and 
buffers alive
+/// until its release callback is called by the consumer.
+///
+/// All buffers of all columns in the record batch must have the same 
device_type
+/// otherwise an error will be returned. If columns are on different devices,
+/// they should be exported using different ArrowDeviceArray instances.
+///
+/// If a non-null sync_event is provided, then the sync_release func must also 
be
+/// non-null. If the sync_event is null, then the sync_release parameter is 
ignored.
+///
+/// \param[in] batch Record batch to export
+/// \param[in] sync_event A struct containing what is needed for syncing if 
necessary
+/// \param[out] out C struct where to export the record batch
+/// \param[out] out_schema optional C struct where to export the record batch 
schema
+ARROW_EXPORT
+Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent 
sync_event,
+                               struct ArrowDeviceArray* out,
+                               struct ArrowSchema* out_schema = NULLPTR);
+
+using DeviceMemoryMapper =
+    std::function<Result<std::shared_ptr<MemoryManager>>(ArrowDeviceType, 
int64_t)>;
+
+/// \brief EXPERIMENTAL: Import C++ device array from the C data interface.
+///
+/// The ArrowArray struct has its contents moved (as per the C data interface
+/// specification) to a private object held alive by the resulting array. The
+/// buffers of the Array are located on the device indicated by the 
device_type.
+///
+/// \param[in,out] array C data interface struct holding the array data
+/// \param[in] type type of the imported array
+/// \param[in] mapper A function to map device + id to memory manager
+/// \return Imported array object
+ARROW_EXPORT
+Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* 
array,
+                                                 std::shared_ptr<DataType> 
type,
+                                                 const DeviceMemoryMapper& 
mapper);
+
+/// \brief EXPERIMENTAL: Import C++ device array and its type from the C data 
interface.
+///
+/// The ArrowArray struct has its contents moved (as per the C data interface
+/// specification) to a private object held alive by the resulting array.
+/// The ArrowSchema struct is released, even if this function fails. The
+/// buffers of the Array are located on the device indicated by the 
device_type.
+///
+/// \param[in,out] array C data interface struct holding the array data
+/// \param[in,out] type C data interface struct holding the array type
+/// \param[in] mapper A function to map device + id to memory manager
+/// \return Imported array object
+ARROW_EXPORT
+Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* 
array,
+                                                 struct ArrowSchema* type,
+                                                 const DeviceMemoryMapper& 
mapper);
+
+/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from 
the C data
+/// interface.
+///
+/// The ArrowArray struct has its contents moved (as per the C data interface
+/// specification) to a private object held alive by the resulting record 
batch.
+/// The buffers of all columns of the record batch are located on the device
+/// indicated by the device type.
+///
+/// \param[in,out] array C data interface struct holding the record batch data
+/// \param[in] schema schema of the imported record batch
+/// \param[in] mapper A function to map device + id to memory manager
+/// \return Imported record batch object
+ARROW_EXPORT
+Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
+    struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema,
+    const DeviceMemoryMapper& mapper);
+
+/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and 
its schema
+/// from the C data interface.
+///
+/// The type represented by the ArrowSchema struct must be a struct type array.
+/// The ArrowArray struct has its contents moved (as per the C data interface
+/// specification) to a private object held alive by the resulting record 
batch.
+/// The ArrowSchema struct is released, even if this function fails. The 
buffers
+/// of all columns of the record batch are located on the device indicated by 
the
+/// device type.
+///
+/// \param[in,out] array C data interface struct holding the record batch data
+/// \param[in,out] schema C data interface struct holding the record batch 
schema
+/// \param[in] mapper A function to map device + id to memory manager
+/// \return Imported record batch object
+ARROW_EXPORT
+Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
+    struct ArrowDeviceArray* array, struct ArrowSchema* schema,
+    const DeviceMemoryMapper& mapper);
+
+/// @}
+
 /// \defgroup c-stream-interface Functions for working with the C data 
interface.
 ///
 /// @{
diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc
index 5fe7b653c8..5c7de8e4a0 100644
--- a/cpp/src/arrow/c/bridge_test.cc
+++ b/cpp/src/arrow/c/bridge_test.cc
@@ -565,6 +565,15 @@ struct ArrayExportChecker {
       ASSERT_EQ(c_export->children, nullptr);
     }
   }
+
+  void operator()(struct ArrowDeviceArray* c_export, const ArrayData& 
expected_data,
+                  const ArrowDeviceType device_type, const int64_t device_id,
+                  const void* sync_event) {
+    ASSERT_EQ(c_export->device_type, device_type);
+    ASSERT_EQ(c_export->device_id, device_id);
+    ASSERT_EQ(c_export->sync_event, sync_event);
+    this->operator()(&c_export->array, expected_data);
+  }
 };
 
 struct RecordBatchExportChecker {
@@ -592,6 +601,15 @@ struct RecordBatchExportChecker {
       ASSERT_EQ(c_export->children, nullptr);
     }
   }
+
+  void operator()(struct ArrowDeviceArray* c_export, const RecordBatch& 
expected_data,
+                  const ArrowDeviceType device_type, const int64_t device_id,
+                  const void* sync_event) {
+    ASSERT_EQ(c_export->device_type, device_type);
+    ASSERT_EQ(c_export->device_id, device_id);
+    ASSERT_EQ(c_export->sync_event, sync_event);
+    this->operator()(&c_export->array, expected_data);
+  }
 };
 
 class TestArrayExport : public ::testing::Test {
@@ -1112,6 +1130,392 @@ TEST_F(TestArrayExport, ExportRecordBatch) {
   }
 }
 
+////////////////////////////////////////////////////////////////////////////
+// Device Array Export Tests
+
+static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
+static const ArrowDeviceType kMyDeviceType = ARROW_DEVICE_EXT_DEV;
+
+class MyBuffer final : public MutableBuffer {
+ public:
+  using MutableBuffer::MutableBuffer;
+
+  ~MyBuffer() { default_memory_pool()->Free(const_cast<uint8_t*>(data_), 
size_); }
+};
+
+class MyMemoryManager : public CPUMemoryManager {
+ public:
+  explicit MyMemoryManager(const std::shared_ptr<Device>& device)
+      : CPUMemoryManager(device, default_memory_pool()) {}
+
+  Result<std::unique_ptr<Buffer>> AllocateBuffer(int64_t size) override {
+    uint8_t* data;
+    RETURN_NOT_OK(pool_->Allocate(size, &data));
+    return std::make_unique<MyBuffer>(data, size, shared_from_this());
+  }
+
+ protected:
+  Result<std::shared_ptr<Buffer>> CopyBufferFrom(
+      const std::shared_ptr<Buffer>& buf,
+      const std::shared_ptr<MemoryManager>& from) override {
+    return CopyNonOwnedFrom(*buf, from);
+  }
+  Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
+      const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override {
+    if (!from->is_cpu()) {
+      return nullptr;
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf.size()));
+    if (buf.size() > 0) {
+      memcpy(dest->mutable_data(), buf.data(), 
static_cast<size_t>(buf.size()));
+    }
+    return std::move(dest);
+  }
+};
+
+class MyDevice : public Device {
+ public:
+  explicit MyDevice(int value) : Device(true), value_(value) {}
+  const char* type_name() const override { return kMyDeviceTypeName; }
+  std::string ToString() const override { return kMyDeviceTypeName; }
+  bool Equals(const Device& other) const override {
+    if (other.type_name() != kMyDeviceTypeName || other.device_type() != 
device_type()) {
+      return false;
+    }
+    return checked_cast<const MyDevice&>(other).value_ == value_;
+  }
+  DeviceAllocationType device_type() const override {
+    return static_cast<DeviceAllocationType>(kMyDeviceType);
+  }
+  int64_t device_id() const override { return value_; }
+  std::shared_ptr<MemoryManager> default_memory_manager() override {
+    return std::make_shared<MyMemoryManager>(shared_from_this());
+  }
+
+ protected:
+  int value_;
+};
+
+class TestDeviceArrayExport : public ::testing::Test {
+ public:
+  void SetUp() override { pool_ = default_memory_pool(); }
+
+  static Result<std::shared_ptr<ArrayData>> ToDeviceData(
+      const std::shared_ptr<MemoryManager>& mm, const ArrayData& data) {
+    arrow::BufferVector buffers;
+    for (const auto& buf : data.buffers) {
+      if (buf) {
+        ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm));
+        buffers.push_back(dest);
+      } else {
+        buffers.push_back(nullptr);
+      }
+    }
+
+    arrow::ArrayDataVector children;
+    for (const auto& child : data.child_data) {
+      ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child));
+      children.push_back(dest);
+    }
+
+    return ArrayData::Make(data.type, data.length, buffers, children, 
data.null_count,
+                           data.offset);
+  }
+
+  static Result<std::shared_ptr<Array>> ToDevice(const 
std::shared_ptr<MemoryManager>& mm,
+                                                 const ArrayData& data) {
+    ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data));
+    return MakeArray(result);
+  }
+
+  template <typename ArrayFactory>
+  static std::function<Result<std::shared_ptr<Array>>()> ToDeviceFactory(
+      const std::shared_ptr<MemoryManager>& mm, ArrayFactory&& factory) {
+    return [&]() { return ToDevice(mm, *factory()->data()); };
+  }
+
+  static std::function<Result<std::shared_ptr<Array>>()> JSONArrayFactory(
+      const std::shared_ptr<MemoryManager>& mm, std::shared_ptr<DataType> type,
+      const char* json) {
+    return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); };
+  }
+
+  template <typename ArrayFactory, typename ExportCheckFunc>
+  void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& 
check_func) {
+    auto orig_bytes = pool_->bytes_allocated();
+
+    std::shared_ptr<Array> arr;
+    ASSERT_OK_AND_ASSIGN(arr, ToResult(factory()));
+    ARROW_SCOPED_TRACE("type = ", arr->type()->ToString(),
+                       ", array data = ", arr->ToString());
+    const ArrayData& data = *arr->data();  // non-owning reference
+    struct ArrowDeviceArray c_export;
+    ASSERT_OK(ExportDeviceArray(*arr, {nullptr, nullptr}, &c_export));
+
+    ArrayExportGuard guard(&c_export.array);
+    auto new_bytes = pool_->bytes_allocated();
+    ASSERT_GT(new_bytes, orig_bytes);
+
+    // Release the shared_ptr<Array>, underlying data should be held alive
+    arr.reset();
+    ASSERT_EQ(pool_->bytes_allocated(), new_bytes);
+    check_func(&c_export, data, kMyDeviceType, 1, nullptr);
+
+    // Release the ArrowArray, underlying data should be destroyed
+    guard.Release();
+    ASSERT_EQ(pool_->bytes_allocated(), orig_bytes);
+  }
+
+  template <typename ArrayFactory>
+  void TestNested(ArrayFactory&& factory) {
+    ArrayExportChecker checker;
+    TestWithArrayFactory(std::forward<ArrayFactory>(factory), checker);
+  }
+
+  void TestNested(const std::shared_ptr<MemoryManager>& mm,
+                  const std::shared_ptr<DataType>& type, const char* json) {
+    TestNested(JSONArrayFactory(mm, type, json));
+  }
+
+  template <typename ArrayFactory>
+  void TestPrimitive(ArrayFactory&& factory) {
+    TestNested(std::forward<ArrayFactory>(factory));
+  }
+
+  void TestPrimitive(const std::shared_ptr<MemoryManager>& mm,
+                     const std::shared_ptr<DataType>& type, const char* json) {
+    TestNested(mm, type, json);
+  }
+
+ protected:
+  MemoryPool* pool_;
+};
+
+TEST_F(TestDeviceArrayExport, Primitive) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  TestPrimitive(mm, int8(), "[1, 2, null, -3]");
+  TestPrimitive(mm, int16(), "[1, 2, -3]");
+  TestPrimitive(mm, int32(), "[1, 2, null, -3]");
+  TestPrimitive(mm, int64(), "[1, 2, -3]");
+  TestPrimitive(mm, uint8(), "[1, 2, 3]");
+  TestPrimitive(mm, uint16(), "[1, 2, null, 3]");
+  TestPrimitive(mm, uint32(), "[1, 2, 3]");
+  TestPrimitive(mm, uint64(), "[1, 2, null, 3]");
+
+  TestPrimitive(mm, boolean(), "[true, false, null]");
+
+  TestPrimitive(mm, float32(), "[1.5, null]");
+  TestPrimitive(mm, float64(), "[1.5, null]");
+
+  TestPrimitive(mm, fixed_size_binary(3), R"(["foo", "bar", null])");
+  TestPrimitive(mm, binary(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, large_binary(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, utf8(), R"(["foo", "bar", null])");
+  TestPrimitive(mm, large_utf8(), R"(["foo", "bar", null])");
+
+  TestPrimitive(mm, decimal(16, 4), R"(["1234.5670", null])");
+  TestPrimitive(mm, decimal256(16, 4), R"(["1234.5670", null])");
+
+  TestPrimitive(mm, month_day_nano_interval(), R"([[-1, 5, 20], null])");
+}
+
+TEST_F(TestDeviceArrayExport, PrimitiveSliced) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  auto factory = [=]() {
+    return (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null, -3]")->data()))
+        ->Slice(1, 2);
+  };
+  TestPrimitive(factory);
+}
+
+TEST_F(TestDeviceArrayExport, Temporal) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  const char* json = "[1, 2, null, 42]";
+  TestPrimitive(mm, date32(), json);
+  TestPrimitive(mm, date64(), json);
+  TestPrimitive(mm, time32(TimeUnit::SECOND), json);
+  TestPrimitive(mm, time32(TimeUnit::MILLI), json);
+  TestPrimitive(mm, time64(TimeUnit::MICRO), json);
+  TestPrimitive(mm, time64(TimeUnit::NANO), json);
+  TestPrimitive(mm, duration(TimeUnit::SECOND), json);
+  TestPrimitive(mm, duration(TimeUnit::MILLI), json);
+  TestPrimitive(mm, duration(TimeUnit::MICRO), json);
+  TestPrimitive(mm, duration(TimeUnit::NANO), json);
+  TestPrimitive(mm, month_interval(), json);
+
+  TestPrimitive(mm, day_time_interval(), "[[7, 600], null]");
+
+  json = R"(["1970-01-01","2000-02-29","1900-02-28"])";
+  TestPrimitive(mm, timestamp(TimeUnit::SECOND), json);
+  TestPrimitive(mm, timestamp(TimeUnit::SECOND, "Europe/Paris"), json);
+  TestPrimitive(mm, timestamp(TimeUnit::MILLI), json);
+  TestPrimitive(mm, timestamp(TimeUnit::MILLI, "Europe/Paris"), json);
+  TestPrimitive(mm, timestamp(TimeUnit::MICRO), json);
+  TestPrimitive(mm, timestamp(TimeUnit::MICRO, "Europe/Paris"), json);
+  TestPrimitive(mm, timestamp(TimeUnit::NANO), json);
+  TestPrimitive(mm, timestamp(TimeUnit::NANO, "Europe/Paris"), json);
+}
+
+TEST_F(TestDeviceArrayExport, List) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  TestNested(mm, list(int8()), "[[1, 2], [3, null], null]");
+  TestNested(mm, large_list(uint16()), "[[1, 2], [3, null], null]");
+  TestNested(mm, fixed_size_list(int64(), 2), "[[1, 2], [3, null], null]");
+
+  TestNested(mm, list(large_list(int32())), "[[[1, 2], [3], null], null]");
+}
+
+TEST_F(TestDeviceArrayExport, ListSliced) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  {
+    auto factory = [=]() {
+      return (*ToDevice(
+                  mm, *ArrayFromJSON(list(int8()), "[[1, 2], [3, null], [4, 5, 
6], null]")
+                           ->data()))
+          ->Slice(1, 2);
+    };
+    TestNested(factory);
+  }
+  {
+    auto factory = [=]() {
+      auto values =
+          (*ToDevice(mm,
+                     *ArrayFromJSON(int16(), "[1, 2, 3, 4, null, 5, 6, 7, 
8]")->data()))
+              ->Slice(1, 6);
+      auto offsets = (*ToDevice(mm, *ArrayFromJSON(int32(), "[0, 2, 3, 5, 
6]")->data()))
+                         ->Slice(2, 4);
+      return ListArray::FromArrays(*offsets, *values);
+    };
+    TestNested(factory);
+  }
+}
+
+TEST_F(TestDeviceArrayExport, Struct) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  const char* data = R"([[1, "foo"], [2, null]])";
+  auto type = struct_({field("a", int8()), field("b", utf8())});
+  TestNested(mm, type, data);
+}
+
+TEST_F(TestDeviceArrayExport, Map) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  const char* json = R"([[[1, "foo"], [2, null]], [[3, "bar"]]])";
+  TestNested(mm, map(int8(), utf8()), json);
+  TestNested(mm, map(int8(), utf8(), /*keys_sorted=*/true), json);
+}
+
+TEST_F(TestDeviceArrayExport, Union) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  const char* data = "[null, [42, 1], [43, true], [42, null], [42, 2]]";
+  // Dense
+  auto field_a = field("a", int8());
+  auto field_b = field("b", boolean(), /*nullable=*/false);
+  auto type = dense_union({field_a, field_b}, {42, 43});
+  TestNested(mm, type, data);
+  // Sparse
+  field_a = field("a", int8(), /*nullable=*/false);
+  field_b = field("b", boolean());
+  type = sparse_union({field_a, field_b}, {42, 43});
+  TestNested(mm, type, data);
+}
+
+TEST_F(TestDeviceArrayExport, Extension) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  TestPrimitive(ToDeviceFactory(mm, ExampleUuid));
+  TestPrimitive(ToDeviceFactory(mm, ExampleSmallint));
+  TestPrimitive(ToDeviceFactory(mm, ExampleComplex128));
+}
+
+TEST_F(TestDeviceArrayExport, ExportArrayAndType) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  struct ArrowSchema c_schema {};
+  struct ArrowDeviceArray c_array {};
+  SchemaExportGuard schema_guard(&c_schema);
+  ArrayExportGuard array_guard(&c_array.array);
+
+  auto array = ToDevice(mm, *ArrayFromJSON(int8(), "[1, 2, 
3]")->data()).ValueOrDie();
+  ASSERT_OK(ExportDeviceArray(*array, {nullptr, nullptr}, &c_array, 
&c_schema));
+  const ArrayData& data = *array->data();
+  array.reset();
+  ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema));
+  ASSERT_FALSE(ArrowArrayIsReleased(&c_array.array));
+  ASSERT_EQ(c_schema.format, std::string("c"));
+  ASSERT_EQ(c_schema.n_children, 0);
+  ArrayExportChecker checker{};
+  checker(&c_array, data, kMyDeviceType, 1, nullptr);
+}
+
+TEST_F(TestDeviceArrayExport, ExportRecordBatch) {
+  std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+  auto mm = device->default_memory_manager();
+
+  struct ArrowSchema c_schema {};
+  struct ArrowDeviceArray c_array {};
+
+  auto schema = ::arrow::schema(
+      {field("ints", int16()), field("bools", boolean(), /*nullable=*/false)});
+  schema = schema->WithMetadata(key_value_metadata(kMetadataKeys2, 
kMetadataValues2));
+  auto arr0 = ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, 
null]")->data()).ValueOrDie();
+  auto arr1 = ToDevice(mm, *ArrayFromJSON(boolean(), "[false, true, 
false]")->data())
+                  .ValueOrDie();
+
+  auto batch_factory = [&]() { return RecordBatch::Make(schema, 3, {arr0, 
arr1}); };
+
+  {
+    auto batch = batch_factory();
+
+    ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, 
&c_schema));
+    SchemaExportGuard schema_guard(&c_schema);
+    ArrayExportGuard array_guard(&c_array.array);
+    RecordBatchExportChecker checker{};
+    checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+
+    // create batch anew, with the same buffer pointers
+    batch = batch_factory();
+    checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+  }
+  {
+    // Check one can export both schema and record batch at once
+    auto batch = batch_factory();
+
+    ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array, 
&c_schema));
+    SchemaExportGuard schema_guard(&c_schema);
+    ArrayExportGuard array_guard(&c_array.array);
+    ASSERT_EQ(c_schema.format, std::string("+s"));
+    ASSERT_EQ(c_schema.n_children, 2);
+    ASSERT_NE(c_schema.metadata, nullptr);
+    ASSERT_EQ(kEncodedMetadata2,
+              std::string(c_schema.metadata, kEncodedMetadata2.size()));
+    RecordBatchExportChecker checker{};
+    checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+
+    // Create batch anew, with the same buffer pointers
+    batch = batch_factory();
+    checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+  }
+}
+
 ////////////////////////////////////////////////////////////////////////////
 // Schema import tests
 
diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h
index 67c62a5181..9cc68fe8c8 100644
--- a/cpp/src/arrow/device.h
+++ b/cpp/src/arrow/device.h
@@ -29,6 +29,24 @@
 
 namespace arrow {
 
+/// \brief EXPERIMENTAL: Device type enum which matches up with C Data Device 
types
+enum class DeviceAllocationType : char {
+  kCPU = 1,
+  kCUDA = 2,
+  kCUDA_HOST = 3,
+  kOPENCL = 4,
+  kVULKAN = 7,
+  kMETAL = 8,
+  kVPI = 9,
+  kROCM = 10,
+  kROCM_HOST = 11,
+  kEXT_DEV = 12,
+  kCUDA_MANAGED = 13,
+  kONEAPI = 14,
+  kWEBGPU = 15,
+  kHEXAGON = 16,
+};
+
 class MemoryManager;
 
 /// \brief EXPERIMENTAL: Abstract interface for hardware devices
@@ -58,6 +76,12 @@ class ARROW_EXPORT Device : public 
std::enable_shared_from_this<Device>,
   /// \brief Whether this instance points to the same device as another one.
   virtual bool Equals(const Device&) const = 0;
 
+  /// \brief A device ID to identify this device if there are multiple of this 
type.
+  ///
+  /// If there is no "device_id" equivalent (such as for the main CPU device on
+  /// non-numa systems) returns -1.
+  virtual int64_t device_id() const { return -1; }
+
   /// \brief Whether this device is the main CPU device.
   ///
   /// This shorthand method is very useful when deciding whether a memory 
address
@@ -71,6 +95,9 @@ class ARROW_EXPORT Device : public 
std::enable_shared_from_this<Device>,
   /// MemoryManager instances with non-default parameters.
   virtual std::shared_ptr<MemoryManager> default_memory_manager() = 0;
 
+  /// \brief Return the DeviceAllocationType of this device
+  virtual DeviceAllocationType device_type() const = 0;
+
  protected:
   ARROW_DISALLOW_COPY_AND_ASSIGN(Device);
   explicit Device(bool is_cpu = false) : is_cpu_(is_cpu) {}
@@ -172,6 +199,7 @@ class ARROW_EXPORT CPUDevice : public Device {
   const char* type_name() const override;
   std::string ToString() const override;
   bool Equals(const Device&) const override;
+  DeviceAllocationType device_type() const override { return 
DeviceAllocationType::kCPU; }
 
   std::shared_ptr<MemoryManager> default_memory_manager() override;
 
diff --git a/cpp/src/arrow/gpu/cuda_context.cc 
b/cpp/src/arrow/gpu/cuda_context.cc
index f754c07d13..869ea6453c 100644
--- a/cpp/src/arrow/gpu/cuda_context.cc
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -384,7 +384,8 @@ Result<std::shared_ptr<Buffer>> 
CudaMemoryManager::ViewBufferTo(
   if (to->is_cpu()) {
     // Device-on-CPU view
     ARROW_ASSIGN_OR_RAISE(auto address, GetHostAddress(buf->address()));
-    return std::make_shared<Buffer>(address, buf->size(), to, buf);
+    return std::make_shared<Buffer>(address, buf->size(), to, buf,
+                                    DeviceAllocationType::kCUDA_HOST);
   }
   return nullptr;
 }
diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
index 0115ed19a1..a1b95c7b41 100644
--- a/cpp/src/arrow/gpu/cuda_context.h
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -92,6 +92,10 @@ class ARROW_EXPORT CudaDevice : public Device {
   std::string ToString() const override;
   bool Equals(const Device&) const override;
   std::shared_ptr<MemoryManager> default_memory_manager() override;
+  DeviceAllocationType device_type() const override {
+    return DeviceAllocationType::kCUDA;
+  }
+  int64_t device_id() const override { return device_number(); }
 
   /// \brief Return a CudaDevice instance for a particular device
   /// \param[in] device_number the CUDA device number
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index 297e4dcf71..860c6311d7 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -198,6 +198,11 @@ Result<std::shared_ptr<CudaIpcMemHandle>> 
CudaBuffer::ExportForIpc() {
   return handle;
 }
 
+CudaHostBuffer::CudaHostBuffer(uint8_t* data, const int64_t size)
+    : MutableBuffer(data, size) {
+  device_type_ = DeviceAllocationType::kCUDA_HOST;
+}
+
 CudaHostBuffer::~CudaHostBuffer() {
   auto maybe_manager = CudaDeviceManager::Instance();
   ARROW_CHECK_OK(maybe_manager.status());
@@ -480,5 +485,21 @@ Result<uint8_t*> GetHostAddress(uintptr_t device_ptr) {
   return static_cast<uint8_t*>(ptr);
 }
 
+Result<std::shared_ptr<MemoryManager>> DefaultMemoryMapper(ArrowDeviceType 
device_type,
+                                                           int64_t device_id) {
+  switch (device_type) {
+    case ARROW_DEVICE_CPU:
+      return default_cpu_memory_manager();
+    case ARROW_DEVICE_CUDA:
+    case ARROW_DEVICE_CUDA_HOST:
+    case ARROW_DEVICE_CUDA_MANAGED: {
+      ARROW_ASSIGN_OR_RAISE(auto device, 
arrow::cuda::CudaDevice::Make(device_id));
+      return device->default_memory_manager();
+    }
+    default:
+      return Status::NotImplemented("memory manager not implemented for 
device");
+  }
+}
+
 }  // namespace cuda
 }  // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 18c23a5078..d323bef034 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -21,6 +21,7 @@
 #include <memory>
 
 #include "arrow/buffer.h"
+#include "arrow/c/abi.h"
 #include "arrow/io/concurrency.h"
 #include "arrow/type_fwd.h"
 
@@ -110,7 +111,8 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
 /// \brief Device-accessible CPU memory created using cudaHostAlloc
 class ARROW_EXPORT CudaHostBuffer : public MutableBuffer {
  public:
-  using MutableBuffer::MutableBuffer;
+  CudaHostBuffer(uint8_t* data, const int64_t size);
+
   ~CudaHostBuffer();
 
   /// \brief Return a device address the GPU can read this memory from.
@@ -258,5 +260,9 @@ Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data,
 ARROW_EXPORT
 Result<uint8_t*> GetHostAddress(uintptr_t device_ptr);
 
+ARROW_EXPORT
+Result<std::shared_ptr<MemoryManager>> DefaultMemoryMapper(ArrowDeviceType 
device_type,
+                                                           int64_t device_id);
+
 }  // namespace cuda
 }  // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc
index aac45d1383..6d392213e2 100644
--- a/cpp/src/arrow/gpu/cuda_test.cc
+++ b/cpp/src/arrow/gpu/cuda_test.cc
@@ -364,6 +364,7 @@ TEST_F(TestCudaHostBuffer, AllocateGlobal) {
 
   ASSERT_TRUE(host_buffer->is_cpu());
   ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_);
+  ASSERT_EQ(host_buffer->device_type(), DeviceAllocationType::kCUDA_HOST);
 
   ASSERT_OK_AND_ASSIGN(auto device_address, 
host_buffer->GetDeviceAddress(context_));
   ASSERT_NE(device_address, 0);
@@ -376,6 +377,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) {
 
   ASSERT_TRUE(host_buffer->is_cpu());
   ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_);
+  ASSERT_EQ(host_buffer->device_type(), DeviceAllocationType::kCUDA_HOST);
 
   // Try to view the host buffer on the device.  This should correspond to
   // GetDeviceAddress() in the previous test.
@@ -385,6 +387,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) {
   ASSERT_NE(device_buffer->address(), 0);
   ASSERT_EQ(device_buffer->size(), host_buffer->size());
   ASSERT_EQ(device_buffer->parent(), host_buffer);
+  ASSERT_EQ(device_buffer->device_type(), DeviceAllocationType::kCUDA);
 
   // View back the device buffer on the CPU.  This should roundtrip.
   ASSERT_OK_AND_ASSIGN(auto buffer, Buffer::View(device_buffer, cpu_mm_));
@@ -393,6 +396,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) {
   ASSERT_EQ(buffer->address(), host_buffer->address());
   ASSERT_EQ(buffer->size(), host_buffer->size());
   ASSERT_EQ(buffer->parent(), device_buffer);
+  ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCUDA_HOST);
 }
 
 // ------------------------------------------------------------------------

Reply via email to