This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 969f4b439c GH-36488: [C++] Import/Export ArrowDeviceArray (#36489)
969f4b439c is described below
commit 969f4b439c8dd9052372fe6c652dbd2459678042
Author: Matt Topol <[email protected]>
AuthorDate: Tue Jul 25 10:33:56 2023 -0400
GH-36488: [C++] Import/Export ArrowDeviceArray (#36489)
### Rationale for this change
With the addition of the `ArrowDeviceArray` we should provide import/export
functions just like we have for `ArrowArray`.
### What changes are included in this PR?
Adding Import/Export functions to the `bridge.h/.cc` files for C Data
Non-CPU data.
This requires adding a device type to buffers and memory managers to
propagate through.
### Are these changes tested?
Yes.
(Still need to add tests to `bridge_test.cc` but I wanted to get eyes on
this first)
### Are there any user-facing changes?
No.
* Closes: #36488
Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
cpp/src/arrow/buffer.h | 22 ++-
cpp/src/arrow/buffer_test.cc | 14 ++
cpp/src/arrow/c/bridge.cc | 192 +++++++++++++++++-
cpp/src/arrow/c/bridge.h | 130 ++++++++++++
cpp/src/arrow/c/bridge_test.cc | 404 ++++++++++++++++++++++++++++++++++++++
cpp/src/arrow/device.h | 28 +++
cpp/src/arrow/gpu/cuda_context.cc | 3 +-
cpp/src/arrow/gpu/cuda_context.h | 4 +
cpp/src/arrow/gpu/cuda_memory.cc | 21 ++
cpp/src/arrow/gpu/cuda_memory.h | 8 +-
cpp/src/arrow/gpu/cuda_test.cc | 4 +
11 files changed, 823 insertions(+), 7 deletions(-)
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index ad2496aeeb..08a3bd749e 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -20,6 +20,7 @@
#include <cstdint>
#include <cstring>
#include <memory>
+#include <optional>
#include <string>
#include <string_view>
#include <utility>
@@ -58,18 +59,31 @@ class ARROW_EXPORT Buffer {
///
/// \note The passed memory must be kept alive through some other means
Buffer(const uint8_t* data, int64_t size)
- : is_mutable_(false), is_cpu_(true), data_(data), size_(size),
capacity_(size) {
+ : is_mutable_(false),
+ is_cpu_(true),
+ data_(data),
+ size_(size),
+ capacity_(size),
+ device_type_(DeviceAllocationType::kCPU) {
SetMemoryManager(default_cpu_memory_manager());
}
Buffer(const uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm,
- std::shared_ptr<Buffer> parent = NULLPTR)
+ std::shared_ptr<Buffer> parent = NULLPTR,
+ std::optional<DeviceAllocationType> device_type = std::nullopt)
: is_mutable_(false),
data_(data),
size_(size),
capacity_(size),
parent_(std::move(parent)) {
+ // SetMemoryManager will also set device_type_
SetMemoryManager(std::move(mm));
+ // if a device type is specified, use that instead. for example:
+ // CUDA_HOST. The CudaMemoryManager will set device_type_ to CUDA,
+ // but you can specify CUDA_HOST as the device type to override it.
+ if (device_type != std::nullopt) {
+ device_type_ = device_type;
+ }
}
Buffer(uintptr_t address, int64_t size, std::shared_ptr<MemoryManager> mm,
@@ -282,6 +296,8 @@ class ARROW_EXPORT Buffer {
const std::shared_ptr<MemoryManager>& memory_manager() const { return
memory_manager_; }
+ std::optional<DeviceAllocationType> device_type() const { return
device_type_; }
+
std::shared_ptr<Buffer> parent() const { return parent_; }
/// \brief Get a RandomAccessFile for reading a buffer
@@ -336,6 +352,7 @@ class ARROW_EXPORT Buffer {
const uint8_t* data_;
int64_t size_;
int64_t capacity_;
+ std::optional<DeviceAllocationType> device_type_;
// null by default, but may be set
std::shared_ptr<Buffer> parent_;
@@ -353,6 +370,7 @@ class ARROW_EXPORT Buffer {
void SetMemoryManager(std::shared_ptr<MemoryManager> mm) {
memory_manager_ = std::move(mm);
is_cpu_ = memory_manager_->is_cpu();
+ device_type_ = memory_manager_->device()->device_type();
}
};
diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc
index ce8bab846d..3dd95cb8af 100644
--- a/cpp/src/arrow/buffer_test.cc
+++ b/cpp/src/arrow/buffer_test.cc
@@ -41,6 +41,7 @@ using internal::checked_cast;
using internal::checked_pointer_cast;
static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
+static const DeviceAllocationType kMyDeviceType =
DeviceAllocationType::kEXT_DEV;
static const int kMyDeviceAllowCopy = 1;
static const int kMyDeviceAllowView = 2;
@@ -70,6 +71,8 @@ class MyDevice : public Device {
return checked_cast<const MyDevice&>(other).value_ == value_;
}
+ DeviceAllocationType device_type() const override { return kMyDeviceType; }
+
std::shared_ptr<MemoryManager> default_memory_manager() override;
int value() const { return value_; }
@@ -256,6 +259,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
+ ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");
@@ -263,6 +267,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
+ ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");
@@ -271,6 +276,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
+ ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
@@ -280,6 +286,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
+ ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
@@ -290,6 +297,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
+ ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");
@@ -297,6 +305,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
+ ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");
@@ -305,6 +314,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
+ ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
@@ -315,6 +325,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
+ ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
@@ -330,6 +341,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), cpu_src_->address());
+ ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");
@@ -338,6 +350,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), my_view_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), cpu_src_->address());
+ ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
@@ -348,6 +361,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), my_copy_src_->address());
+ ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");
diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc
index 85a5156d11..13355dd6d0 100644
--- a/cpp/src/arrow/c/bridge.cc
+++ b/cpp/src/arrow/c/bridge.cc
@@ -522,6 +522,8 @@ struct ExportedArrayPrivateData :
PoolAllocationMixin<ExportedArrayPrivateData>
std::shared_ptr<ArrayData> data_;
+ RawSyncEvent sync_event_;
+
ExportedArrayPrivateData() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData);
ARROW_DISALLOW_COPY_AND_ASSIGN(ExportedArrayPrivateData);
@@ -544,7 +546,12 @@ void ReleaseExportedArray(struct ArrowArray* array) {
<< "Dictionary release callback should have marked it released";
}
DCHECK_NE(array->private_data, nullptr);
- delete reinterpret_cast<ExportedArrayPrivateData*>(array->private_data);
+ auto* pdata =
reinterpret_cast<ExportedArrayPrivateData*>(array->private_data);
+ if (pdata->sync_event_.sync_event != nullptr &&
+ pdata->sync_event_.release_func != nullptr) {
+ pdata->sync_event_.release_func(pdata->sync_event_.sync_event);
+ }
+ delete pdata;
ArrowArrayMarkReleased(array);
}
@@ -584,6 +591,7 @@ struct ArrayExporter {
// Store owning pointer to ArrayData
export_.data_ = data;
+ export_.sync_event_ = RawSyncEvent();
return Status::OK();
}
@@ -663,6 +671,118 @@ Status ExportRecordBatch(const RecordBatch& batch, struct
ArrowArray* out,
return Status::OK();
}
+//////////////////////////////////////////////////////////////////////////
+// C device arrays
+
+Status ValidateDeviceInfo(const ArrayData& data,
+ std::optional<DeviceAllocationType>* device_type,
+ int64_t* device_id) {
+ for (const auto& buf : data.buffers) {
+ if (!buf) {
+ continue;
+ }
+
+ if (*device_type == std::nullopt) {
+ *device_type = buf->device_type();
+ *device_id = buf->device()->device_id();
+ continue;
+ }
+
+ if (buf->device_type() != *device_type) {
+ return Status::Invalid(
+ "Exporting device array with buffers on more than one device.");
+ }
+
+ if (buf->device()->device_id() != *device_id) {
+ return Status::Invalid(
+ "Exporting device array with buffers on multiple device ids.");
+ }
+ }
+
+ for (const auto& child : data.child_data) {
+ RETURN_NOT_OK(ValidateDeviceInfo(*child, device_type, device_id));
+ }
+
+ return Status::OK();
+}
+
+Result<std::pair<std::optional<DeviceAllocationType>, int64_t>>
ValidateDeviceInfo(
+ const ArrayData& data) {
+ std::optional<DeviceAllocationType> device_type;
+ int64_t device_id = -1;
+ RETURN_NOT_OK(ValidateDeviceInfo(data, &device_type, &device_id));
+ return std::make_pair(device_type, device_id);
+}
+
+Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
+ struct ArrowDeviceArray* out, struct ArrowSchema*
out_schema) {
+ if (sync_event.sync_event != nullptr && sync_event.release_func) {
+ return Status::Invalid(
+ "Must provide a release event function if providing a non-null event");
+ }
+
+ SchemaExportGuard guard(out_schema);
+ if (out_schema != nullptr) {
+ RETURN_NOT_OK(ExportType(*array.type(), out_schema));
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array.data()));
+ if (!device_info.first) {
+ out->device_type = ARROW_DEVICE_CPU;
+ } else {
+ out->device_type = static_cast<ArrowDeviceType>(*device_info.first);
+ }
+ out->device_id = device_info.second;
+
+ ArrayExporter exporter;
+ RETURN_NOT_OK(exporter.Export(array.data()));
+ exporter.Finish(&out->array);
+
+ auto* pdata =
reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
+ pdata->sync_event_ = sync_event;
+ out->sync_event = sync_event.sync_event;
+
+ guard.Detach();
+ return Status::OK();
+}
+
+Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent
sync_event,
+ struct ArrowDeviceArray* out,
+ struct ArrowSchema* out_schema) {
+ if (sync_event.sync_event != nullptr && sync_event.release_func == nullptr) {
+ return Status::Invalid(
+ "Must provide a release event function if providing a non-null event");
+ }
+
+ // XXX perhaps bypass ToStructArray for speed?
+ ARROW_ASSIGN_OR_RAISE(auto array, batch.ToStructArray());
+
+ SchemaExportGuard guard(out_schema);
+ if (out_schema != nullptr) {
+ // Export the schema, not the struct type, so as not to lose top-level
metadata
+ RETURN_NOT_OK(ExportSchema(*batch.schema(), out_schema));
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto device_info, ValidateDeviceInfo(*array->data()));
+ if (!device_info.first) {
+ out->device_type = ARROW_DEVICE_CPU;
+ } else {
+ out->device_type = static_cast<ArrowDeviceType>(*device_info.first);
+ }
+ out->device_id = device_info.second;
+
+ ArrayExporter exporter;
+ RETURN_NOT_OK(exporter.Export(array->data()));
+ exporter.Finish(&out->array);
+
+ auto* pdata =
reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
+ pdata->sync_event_ = sync_event;
+ out->sync_event = sync_event.sync_event;
+
+ guard.Detach();
+ return Status::OK();
+}
+
//////////////////////////////////////////////////////////////////////////
// C schema import
@@ -1242,6 +1362,7 @@ namespace {
// The ArrowArray is released on destruction.
struct ImportedArrayData {
struct ArrowArray array_;
+ void* sync_event_;
ImportedArrayData() {
ArrowArrayMarkReleased(&array_); // Initially released
@@ -1267,6 +1388,11 @@ class ImportedBuffer : public Buffer {
std::shared_ptr<ImportedArrayData> import)
: Buffer(data, size), import_(std::move(import)) {}
+ ImportedBuffer(const uint8_t* data, int64_t size,
std::shared_ptr<MemoryManager> mm,
+ DeviceAllocationType device_type,
+ std::shared_ptr<ImportedArrayData> import)
+ : Buffer(data, size, mm, nullptr, device_type),
import_(std::move(import)) {}
+
~ImportedBuffer() override {}
protected:
@@ -1275,7 +1401,20 @@ class ImportedBuffer : public Buffer {
struct ArrayImporter {
explicit ArrayImporter(const std::shared_ptr<DataType>& type)
- : type_(type), zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea,
0)) {}
+ : type_(type),
+ zero_size_buffer_(std::make_shared<Buffer>(kZeroSizeArea, 0)),
+ device_type_(DeviceAllocationType::kCPU) {}
+
+ Status Import(struct ArrowDeviceArray* src, const DeviceMemoryMapper&
mapper) {
+ ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type,
src->device_id));
+ device_type_ = static_cast<DeviceAllocationType>(src->device_type);
+ RETURN_NOT_OK(Import(&src->array));
+ import_->sync_event_ = src->sync_event;
+ // reset internal state before next import
+ memory_mgr_.reset();
+ device_type_ = DeviceAllocationType::kCPU;
+ return Status::OK();
+ }
Status Import(struct ArrowArray* src) {
if (ArrowArrayIsReleased(src)) {
@@ -1588,7 +1727,12 @@ struct ArrayImporter {
std::shared_ptr<Buffer>* out = &data_->buffers[buffer_id];
auto data = reinterpret_cast<const
uint8_t*>(c_struct_->buffers[buffer_id]);
if (data != nullptr) {
- *out = std::make_shared<ImportedBuffer>(data, buffer_size, import_);
+ if (memory_mgr_) {
+ *out = std::make_shared<ImportedBuffer>(data, buffer_size, memory_mgr_,
+ device_type_, import_);
+ } else {
+ *out = std::make_shared<ImportedBuffer>(data, buffer_size, import_);
+ }
} else if (is_null_bitmap) {
out->reset();
} else {
@@ -1613,6 +1757,9 @@ struct ArrayImporter {
// For imported null buffer pointers
std::shared_ptr<Buffer> zero_size_buffer_;
+
+ std::shared_ptr<MemoryManager> memory_mgr_;
+ DeviceAllocationType device_type_;
};
} // namespace
@@ -1652,6 +1799,45 @@ Result<std::shared_ptr<RecordBatch>>
ImportRecordBatch(struct ArrowArray* array,
return ImportRecordBatch(array, *maybe_schema);
}
+Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray*
array,
+ std::shared_ptr<DataType>
type,
+ const DeviceMemoryMapper&
mapper) {
+ ArrayImporter importer(type);
+ RETURN_NOT_OK(importer.Import(array, mapper));
+ return importer.MakeArray();
+}
+
+Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray*
array,
+ struct ArrowSchema* type,
+ const DeviceMemoryMapper&
mapper) {
+ auto maybe_type = ImportType(type);
+ if (!maybe_type.ok()) {
+ ArrowArrayRelease(&array->array);
+ return maybe_type.status();
+ }
+ return ImportDeviceArray(array, *maybe_type, mapper);
+}
+
+Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
+ struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema,
+ const DeviceMemoryMapper& mapper) {
+ auto type = struct_(schema->fields());
+ ArrayImporter importer(type);
+ RETURN_NOT_OK(importer.Import(array, mapper));
+ return importer.MakeRecordBatch(std::move(schema));
+}
+
+Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
+ struct ArrowDeviceArray* array, struct ArrowSchema* schema,
+ const DeviceMemoryMapper& mapper) {
+ auto maybe_schema = ImportSchema(schema);
+ if (!maybe_schema.ok()) {
+ ArrowArrayRelease(&array->array);
+ return maybe_schema.status();
+ }
+ return ImportDeviceRecordBatch(array, *maybe_schema, mapper);
+}
+
//////////////////////////////////////////////////////////////////////////
// C stream export
diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h
index 3b1a013d20..92707a5972 100644
--- a/cpp/src/arrow/c/bridge.h
+++ b/cpp/src/arrow/c/bridge.h
@@ -17,6 +17,7 @@
#pragma once
+#include <functional>
#include <memory>
#include <string>
@@ -166,6 +167,135 @@ Result<std::shared_ptr<RecordBatch>>
ImportRecordBatch(struct ArrowArray* array,
/// @}
+/// \defgroup c-data-device-interface Functions for working with the C data
device
+/// interface.
+///
+/// @{
+
+/// \brief EXPERIMENTAL: Type for freeing a sync event
+///
+/// If synchronization is necessary for accessing the data on a device,
+/// a pointer to an event needs to be passed when exporting the device
+/// array. It's the responsibility of the release function for the array
+/// to release the event. Both can be null if no sync'ing is necessary.
+struct RawSyncEvent {
+ void* sync_event = NULL;
+ std::function<void(void*)> release_func;
+};
+
+/// \brief EXPERIMENTAL: Export C++ Array as an ArrowDeviceArray.
+///
+/// The resulting ArrowDeviceArray struct keeps the array data and buffers
alive
+/// until its release callback is called by the consumer. All buffers in
+/// the provided array MUST have the same device_type, otherwise an error
+/// will be returned.
+///
+/// If a non-null sync_event is provided, then the sync_release func must also
be
+/// non-null. If the sync_event is null, then the sync_release parameter is
not called.
+///
+/// \param[in] array Array object to export
+/// \param[in] sync_event A struct containing what is needed for syncing if
necessary
+/// \param[out] out C struct to export the array to
+/// \param[out] out_schema optional C struct to export the array type to
+ARROW_EXPORT
+Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
+ struct ArrowDeviceArray* out,
+ struct ArrowSchema* out_schema = NULLPTR);
+
+/// \brief EXPERIMENTAL: Export C++ RecordBatch as an ArrowDeviceArray.
+///
+/// The record batch is exported as if it were a struct array.
+/// The resulting ArrowDeviceArray struct keeps the record batch data and
buffers alive
+/// until its release callback is called by the consumer.
+///
+/// All buffers of all columns in the record batch must have the same
device_type
+/// otherwise an error will be returned. If columns are on different devices,
+/// they should be exported using different ArrowDeviceArray instances.
+///
+/// If a non-null sync_event is provided, then the sync_release func must also
be
+/// non-null. If the sync_event is null, then the sync_release parameter is
ignored.
+///
+/// \param[in] batch Record batch to export
+/// \param[in] sync_event A struct containing what is needed for syncing if
necessary
+/// \param[out] out C struct where to export the record batch
+/// \param[out] out_schema optional C struct where to export the record batch
schema
+ARROW_EXPORT
+Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent
sync_event,
+ struct ArrowDeviceArray* out,
+ struct ArrowSchema* out_schema = NULLPTR);
+
+using DeviceMemoryMapper =
+ std::function<Result<std::shared_ptr<MemoryManager>>(ArrowDeviceType,
int64_t)>;
+
+/// \brief EXPERIMENTAL: Import C++ device array from the C data interface.
+///
+/// The ArrowArray struct has its contents moved (as per the C data interface
+/// specification) to a private object held alive by the resulting array. The
+/// buffers of the Array are located on the device indicated by the
device_type.
+///
+/// \param[in,out] array C data interface struct holding the array data
+/// \param[in] type type of the imported array
+/// \param[in] mapper A function to map device + id to memory manager
+/// \return Imported array object
+ARROW_EXPORT
+Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray*
array,
+ std::shared_ptr<DataType>
type,
+ const DeviceMemoryMapper&
mapper);
+
+/// \brief EXPERIMENTAL: Import C++ device array and its type from the C data
interface.
+///
+/// The ArrowArray struct has its contents moved (as per the C data interface
+/// specification) to a private object held alive by the resulting array.
+/// The ArrowSchema struct is released, even if this function fails. The
+/// buffers of the Array are located on the device indicated by the
device_type.
+///
+/// \param[in,out] array C data interface struct holding the array data
+/// \param[in,out] type C data interface struct holding the array type
+/// \param[in] mapper A function to map device + id to memory manager
+/// \return Imported array object
+ARROW_EXPORT
+Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray*
array,
+ struct ArrowSchema* type,
+ const DeviceMemoryMapper&
mapper);
+
+/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from
the C data
+/// interface.
+///
+/// The ArrowArray struct has its contents moved (as per the C data interface
+/// specification) to a private object held alive by the resulting record
batch.
+/// The buffers of all columns of the record batch are located on the device
+/// indicated by the device type.
+///
+/// \param[in,out] array C data interface struct holding the record batch data
+/// \param[in] schema schema of the imported record batch
+/// \param[in] mapper A function to map device + id to memory manager
+/// \return Imported record batch object
+ARROW_EXPORT
+Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
+ struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema,
+ const DeviceMemoryMapper& mapper);
+
+/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and
its schema
+/// from the C data interface.
+///
+/// The type represented by the ArrowSchema struct must be a struct type array.
+/// The ArrowArray struct has its contents moved (as per the C data interface
+/// specification) to a private object held alive by the resulting record
batch.
+/// The ArrowSchema struct is released, even if this function fails. The
buffers
+/// of all columns of the record batch are located on the device indicated by
the
+/// device type.
+///
+/// \param[in,out] array C data interface struct holding the record batch data
+/// \param[in,out] schema C data interface struct holding the record batch
schema
+/// \param[in] mapper A function to map device + id to memory manager
+/// \return Imported record batch object
+ARROW_EXPORT
+Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
+ struct ArrowDeviceArray* array, struct ArrowSchema* schema,
+ const DeviceMemoryMapper& mapper);
+
+/// @}
+
/// \defgroup c-stream-interface Functions for working with the C data
interface.
///
/// @{
diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc
index 5fe7b653c8..5c7de8e4a0 100644
--- a/cpp/src/arrow/c/bridge_test.cc
+++ b/cpp/src/arrow/c/bridge_test.cc
@@ -565,6 +565,15 @@ struct ArrayExportChecker {
ASSERT_EQ(c_export->children, nullptr);
}
}
+
+ void operator()(struct ArrowDeviceArray* c_export, const ArrayData&
expected_data,
+ const ArrowDeviceType device_type, const int64_t device_id,
+ const void* sync_event) {
+ ASSERT_EQ(c_export->device_type, device_type);
+ ASSERT_EQ(c_export->device_id, device_id);
+ ASSERT_EQ(c_export->sync_event, sync_event);
+ this->operator()(&c_export->array, expected_data);
+ }
};
struct RecordBatchExportChecker {
@@ -592,6 +601,15 @@ struct RecordBatchExportChecker {
ASSERT_EQ(c_export->children, nullptr);
}
}
+
+ void operator()(struct ArrowDeviceArray* c_export, const RecordBatch&
expected_data,
+ const ArrowDeviceType device_type, const int64_t device_id,
+ const void* sync_event) {
+ ASSERT_EQ(c_export->device_type, device_type);
+ ASSERT_EQ(c_export->device_id, device_id);
+ ASSERT_EQ(c_export->sync_event, sync_event);
+ this->operator()(&c_export->array, expected_data);
+ }
};
class TestArrayExport : public ::testing::Test {
@@ -1112,6 +1130,392 @@ TEST_F(TestArrayExport, ExportRecordBatch) {
}
}
+////////////////////////////////////////////////////////////////////////////
+// Device Array Export Tests
+
+static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
+static const ArrowDeviceType kMyDeviceType = ARROW_DEVICE_EXT_DEV;
+
+class MyBuffer final : public MutableBuffer {
+ public:
+ using MutableBuffer::MutableBuffer;
+
+ ~MyBuffer() { default_memory_pool()->Free(const_cast<uint8_t*>(data_),
size_); }
+};
+
+class MyMemoryManager : public CPUMemoryManager {
+ public:
+ explicit MyMemoryManager(const std::shared_ptr<Device>& device)
+ : CPUMemoryManager(device, default_memory_pool()) {}
+
+ Result<std::unique_ptr<Buffer>> AllocateBuffer(int64_t size) override {
+ uint8_t* data;
+ RETURN_NOT_OK(pool_->Allocate(size, &data));
+ return std::make_unique<MyBuffer>(data, size, shared_from_this());
+ }
+
+ protected:
+ Result<std::shared_ptr<Buffer>> CopyBufferFrom(
+ const std::shared_ptr<Buffer>& buf,
+ const std::shared_ptr<MemoryManager>& from) override {
+ return CopyNonOwnedFrom(*buf, from);
+ }
+ Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
+ const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override {
+ if (!from->is_cpu()) {
+ return nullptr;
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf.size()));
+ if (buf.size() > 0) {
+ memcpy(dest->mutable_data(), buf.data(),
static_cast<size_t>(buf.size()));
+ }
+ return std::move(dest);
+ }
+};
+
+class MyDevice : public Device {
+ public:
+ explicit MyDevice(int value) : Device(true), value_(value) {}
+ const char* type_name() const override { return kMyDeviceTypeName; }
+ std::string ToString() const override { return kMyDeviceTypeName; }
+ bool Equals(const Device& other) const override {
+ if (other.type_name() != kMyDeviceTypeName || other.device_type() !=
device_type()) {
+ return false;
+ }
+ return checked_cast<const MyDevice&>(other).value_ == value_;
+ }
+ DeviceAllocationType device_type() const override {
+ return static_cast<DeviceAllocationType>(kMyDeviceType);
+ }
+ int64_t device_id() const override { return value_; }
+ std::shared_ptr<MemoryManager> default_memory_manager() override {
+ return std::make_shared<MyMemoryManager>(shared_from_this());
+ }
+
+ protected:
+ int value_;
+};
+
+class TestDeviceArrayExport : public ::testing::Test {
+ public:
+ void SetUp() override { pool_ = default_memory_pool(); }
+
+ static Result<std::shared_ptr<ArrayData>> ToDeviceData(
+ const std::shared_ptr<MemoryManager>& mm, const ArrayData& data) {
+ arrow::BufferVector buffers;
+ for (const auto& buf : data.buffers) {
+ if (buf) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, mm->CopyBuffer(buf, mm));
+ buffers.push_back(dest);
+ } else {
+ buffers.push_back(nullptr);
+ }
+ }
+
+ arrow::ArrayDataVector children;
+ for (const auto& child : data.child_data) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, ToDeviceData(mm, *child));
+ children.push_back(dest);
+ }
+
+ return ArrayData::Make(data.type, data.length, buffers, children,
data.null_count,
+ data.offset);
+ }
+
+ static Result<std::shared_ptr<Array>> ToDevice(const
std::shared_ptr<MemoryManager>& mm,
+ const ArrayData& data) {
+ ARROW_ASSIGN_OR_RAISE(auto result, ToDeviceData(mm, data));
+ return MakeArray(result);
+ }
+
+ template <typename ArrayFactory>
+ static std::function<Result<std::shared_ptr<Array>>()> ToDeviceFactory(
+ const std::shared_ptr<MemoryManager>& mm, ArrayFactory&& factory) {
+ return [&]() { return ToDevice(mm, *factory()->data()); };
+ }
+
+ static std::function<Result<std::shared_ptr<Array>>()> JSONArrayFactory(
+ const std::shared_ptr<MemoryManager>& mm, std::shared_ptr<DataType> type,
+ const char* json) {
+ return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); };
+ }
+
+ template <typename ArrayFactory, typename ExportCheckFunc>
+ void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&&
check_func) {
+ auto orig_bytes = pool_->bytes_allocated();
+
+ std::shared_ptr<Array> arr;
+ ASSERT_OK_AND_ASSIGN(arr, ToResult(factory()));
+ ARROW_SCOPED_TRACE("type = ", arr->type()->ToString(),
+ ", array data = ", arr->ToString());
+ const ArrayData& data = *arr->data(); // non-owning reference
+ struct ArrowDeviceArray c_export;
+ ASSERT_OK(ExportDeviceArray(*arr, {nullptr, nullptr}, &c_export));
+
+ ArrayExportGuard guard(&c_export.array);
+ auto new_bytes = pool_->bytes_allocated();
+ ASSERT_GT(new_bytes, orig_bytes);
+
+ // Release the shared_ptr<Array>, underlying data should be held alive
+ arr.reset();
+ ASSERT_EQ(pool_->bytes_allocated(), new_bytes);
+ check_func(&c_export, data, kMyDeviceType, 1, nullptr);
+
+ // Release the ArrowArray, underlying data should be destroyed
+ guard.Release();
+ ASSERT_EQ(pool_->bytes_allocated(), orig_bytes);
+ }
+
+ template <typename ArrayFactory>
+ void TestNested(ArrayFactory&& factory) {
+ ArrayExportChecker checker;
+ TestWithArrayFactory(std::forward<ArrayFactory>(factory), checker);
+ }
+
+ void TestNested(const std::shared_ptr<MemoryManager>& mm,
+ const std::shared_ptr<DataType>& type, const char* json) {
+ TestNested(JSONArrayFactory(mm, type, json));
+ }
+
+ template <typename ArrayFactory>
+ void TestPrimitive(ArrayFactory&& factory) {
+ TestNested(std::forward<ArrayFactory>(factory));
+ }
+
+ void TestPrimitive(const std::shared_ptr<MemoryManager>& mm,
+ const std::shared_ptr<DataType>& type, const char* json) {
+ TestNested(mm, type, json);
+ }
+
+ protected:
+ MemoryPool* pool_;
+};
+
+TEST_F(TestDeviceArrayExport, Primitive) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ TestPrimitive(mm, int8(), "[1, 2, null, -3]");
+ TestPrimitive(mm, int16(), "[1, 2, -3]");
+ TestPrimitive(mm, int32(), "[1, 2, null, -3]");
+ TestPrimitive(mm, int64(), "[1, 2, -3]");
+ TestPrimitive(mm, uint8(), "[1, 2, 3]");
+ TestPrimitive(mm, uint16(), "[1, 2, null, 3]");
+ TestPrimitive(mm, uint32(), "[1, 2, 3]");
+ TestPrimitive(mm, uint64(), "[1, 2, null, 3]");
+
+ TestPrimitive(mm, boolean(), "[true, false, null]");
+
+ TestPrimitive(mm, float32(), "[1.5, null]");
+ TestPrimitive(mm, float64(), "[1.5, null]");
+
+ TestPrimitive(mm, fixed_size_binary(3), R"(["foo", "bar", null])");
+ TestPrimitive(mm, binary(), R"(["foo", "bar", null])");
+ TestPrimitive(mm, large_binary(), R"(["foo", "bar", null])");
+ TestPrimitive(mm, utf8(), R"(["foo", "bar", null])");
+ TestPrimitive(mm, large_utf8(), R"(["foo", "bar", null])");
+
+ TestPrimitive(mm, decimal(16, 4), R"(["1234.5670", null])");
+ TestPrimitive(mm, decimal256(16, 4), R"(["1234.5670", null])");
+
+ TestPrimitive(mm, month_day_nano_interval(), R"([[-1, 5, 20], null])");
+}
+
+TEST_F(TestDeviceArrayExport, PrimitiveSliced) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ auto factory = [=]() {
+ return (*ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2, null, -3]")->data()))
+ ->Slice(1, 2);
+ };
+ TestPrimitive(factory);
+}
+
+TEST_F(TestDeviceArrayExport, Temporal) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ const char* json = "[1, 2, null, 42]";
+ TestPrimitive(mm, date32(), json);
+ TestPrimitive(mm, date64(), json);
+ TestPrimitive(mm, time32(TimeUnit::SECOND), json);
+ TestPrimitive(mm, time32(TimeUnit::MILLI), json);
+ TestPrimitive(mm, time64(TimeUnit::MICRO), json);
+ TestPrimitive(mm, time64(TimeUnit::NANO), json);
+ TestPrimitive(mm, duration(TimeUnit::SECOND), json);
+ TestPrimitive(mm, duration(TimeUnit::MILLI), json);
+ TestPrimitive(mm, duration(TimeUnit::MICRO), json);
+ TestPrimitive(mm, duration(TimeUnit::NANO), json);
+ TestPrimitive(mm, month_interval(), json);
+
+ TestPrimitive(mm, day_time_interval(), "[[7, 600], null]");
+
+ json = R"(["1970-01-01","2000-02-29","1900-02-28"])";
+ TestPrimitive(mm, timestamp(TimeUnit::SECOND), json);
+ TestPrimitive(mm, timestamp(TimeUnit::SECOND, "Europe/Paris"), json);
+ TestPrimitive(mm, timestamp(TimeUnit::MILLI), json);
+ TestPrimitive(mm, timestamp(TimeUnit::MILLI, "Europe/Paris"), json);
+ TestPrimitive(mm, timestamp(TimeUnit::MICRO), json);
+ TestPrimitive(mm, timestamp(TimeUnit::MICRO, "Europe/Paris"), json);
+ TestPrimitive(mm, timestamp(TimeUnit::NANO), json);
+ TestPrimitive(mm, timestamp(TimeUnit::NANO, "Europe/Paris"), json);
+}
+
+TEST_F(TestDeviceArrayExport, List) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ TestNested(mm, list(int8()), "[[1, 2], [3, null], null]");
+ TestNested(mm, large_list(uint16()), "[[1, 2], [3, null], null]");
+ TestNested(mm, fixed_size_list(int64(), 2), "[[1, 2], [3, null], null]");
+
+ TestNested(mm, list(large_list(int32())), "[[[1, 2], [3], null], null]");
+}
+
+TEST_F(TestDeviceArrayExport, ListSliced) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ {
+ auto factory = [=]() {
+ return (*ToDevice(
+ mm, *ArrayFromJSON(list(int8()), "[[1, 2], [3, null], [4, 5,
6], null]")
+ ->data()))
+ ->Slice(1, 2);
+ };
+ TestNested(factory);
+ }
+ {
+ auto factory = [=]() {
+ auto values =
+ (*ToDevice(mm,
+ *ArrayFromJSON(int16(), "[1, 2, 3, 4, null, 5, 6, 7,
8]")->data()))
+ ->Slice(1, 6);
+ auto offsets = (*ToDevice(mm, *ArrayFromJSON(int32(), "[0, 2, 3, 5,
6]")->data()))
+ ->Slice(2, 4);
+ return ListArray::FromArrays(*offsets, *values);
+ };
+ TestNested(factory);
+ }
+}
+
+TEST_F(TestDeviceArrayExport, Struct) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ const char* data = R"([[1, "foo"], [2, null]])";
+ auto type = struct_({field("a", int8()), field("b", utf8())});
+ TestNested(mm, type, data);
+}
+
+TEST_F(TestDeviceArrayExport, Map) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ const char* json = R"([[[1, "foo"], [2, null]], [[3, "bar"]]])";
+ TestNested(mm, map(int8(), utf8()), json);
+ TestNested(mm, map(int8(), utf8(), /*keys_sorted=*/true), json);
+}
+
+TEST_F(TestDeviceArrayExport, Union) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ const char* data = "[null, [42, 1], [43, true], [42, null], [42, 2]]";
+ // Dense
+ auto field_a = field("a", int8());
+ auto field_b = field("b", boolean(), /*nullable=*/false);
+ auto type = dense_union({field_a, field_b}, {42, 43});
+ TestNested(mm, type, data);
+ // Sparse
+ field_a = field("a", int8(), /*nullable=*/false);
+ field_b = field("b", boolean());
+ type = sparse_union({field_a, field_b}, {42, 43});
+ TestNested(mm, type, data);
+}
+
+TEST_F(TestDeviceArrayExport, Extension) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ TestPrimitive(ToDeviceFactory(mm, ExampleUuid));
+ TestPrimitive(ToDeviceFactory(mm, ExampleSmallint));
+ TestPrimitive(ToDeviceFactory(mm, ExampleComplex128));
+}
+
+TEST_F(TestDeviceArrayExport, ExportArrayAndType) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ struct ArrowSchema c_schema {};
+ struct ArrowDeviceArray c_array {};
+ SchemaExportGuard schema_guard(&c_schema);
+ ArrayExportGuard array_guard(&c_array.array);
+
+ auto array = ToDevice(mm, *ArrayFromJSON(int8(), "[1, 2,
3]")->data()).ValueOrDie();
+ ASSERT_OK(ExportDeviceArray(*array, {nullptr, nullptr}, &c_array,
&c_schema));
+ const ArrayData& data = *array->data();
+ array.reset();
+ ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema));
+ ASSERT_FALSE(ArrowArrayIsReleased(&c_array.array));
+ ASSERT_EQ(c_schema.format, std::string("c"));
+ ASSERT_EQ(c_schema.n_children, 0);
+ ArrayExportChecker checker{};
+ checker(&c_array, data, kMyDeviceType, 1, nullptr);
+}
+
+TEST_F(TestDeviceArrayExport, ExportRecordBatch) {
+ std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
+ auto mm = device->default_memory_manager();
+
+ struct ArrowSchema c_schema {};
+ struct ArrowDeviceArray c_array {};
+
+ auto schema = ::arrow::schema(
+ {field("ints", int16()), field("bools", boolean(), /*nullable=*/false)});
+ schema = schema->WithMetadata(key_value_metadata(kMetadataKeys2,
kMetadataValues2));
+ auto arr0 = ToDevice(mm, *ArrayFromJSON(int16(), "[1, 2,
null]")->data()).ValueOrDie();
+ auto arr1 = ToDevice(mm, *ArrayFromJSON(boolean(), "[false, true,
false]")->data())
+ .ValueOrDie();
+
+ auto batch_factory = [&]() { return RecordBatch::Make(schema, 3, {arr0,
arr1}); };
+
+ {
+ auto batch = batch_factory();
+
+ ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array,
&c_schema));
+ SchemaExportGuard schema_guard(&c_schema);
+ ArrayExportGuard array_guard(&c_array.array);
+ RecordBatchExportChecker checker{};
+ checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+
+ // create batch anew, with the same buffer pointers
+ batch = batch_factory();
+ checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+ }
+ {
+ // Check one can export both schema and record batch at once
+ auto batch = batch_factory();
+
+ ASSERT_OK(ExportDeviceRecordBatch(*batch, {nullptr, nullptr}, &c_array,
&c_schema));
+ SchemaExportGuard schema_guard(&c_schema);
+ ArrayExportGuard array_guard(&c_array.array);
+ ASSERT_EQ(c_schema.format, std::string("+s"));
+ ASSERT_EQ(c_schema.n_children, 2);
+ ASSERT_NE(c_schema.metadata, nullptr);
+ ASSERT_EQ(kEncodedMetadata2,
+ std::string(c_schema.metadata, kEncodedMetadata2.size()));
+ RecordBatchExportChecker checker{};
+ checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+
+ // Create batch anew, with the same buffer pointers
+ batch = batch_factory();
+ checker(&c_array, *batch, kMyDeviceType, 1, nullptr);
+ }
+}
+
////////////////////////////////////////////////////////////////////////////
// Schema import tests
diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h
index 67c62a5181..9cc68fe8c8 100644
--- a/cpp/src/arrow/device.h
+++ b/cpp/src/arrow/device.h
@@ -29,6 +29,24 @@
namespace arrow {
+/// \brief EXPERIMENTAL: Device type enum which matches up with C Data Device
types
+enum class DeviceAllocationType : char {
+ kCPU = 1,
+ kCUDA = 2,
+ kCUDA_HOST = 3,
+ kOPENCL = 4,
+ kVULKAN = 7,
+ kMETAL = 8,
+ kVPI = 9,
+ kROCM = 10,
+ kROCM_HOST = 11,
+ kEXT_DEV = 12,
+ kCUDA_MANAGED = 13,
+ kONEAPI = 14,
+ kWEBGPU = 15,
+ kHEXAGON = 16,
+};
+
class MemoryManager;
/// \brief EXPERIMENTAL: Abstract interface for hardware devices
@@ -58,6 +76,12 @@ class ARROW_EXPORT Device : public
std::enable_shared_from_this<Device>,
/// \brief Whether this instance points to the same device as another one.
virtual bool Equals(const Device&) const = 0;
+ /// \brief A device ID to identify this device if there are multiple of this
type.
+ ///
+ /// If there is no "device_id" equivalent (such as for the main CPU device on
+ /// non-numa systems) returns -1.
+ virtual int64_t device_id() const { return -1; }
+
/// \brief Whether this device is the main CPU device.
///
/// This shorthand method is very useful when deciding whether a memory
address
@@ -71,6 +95,9 @@ class ARROW_EXPORT Device : public
std::enable_shared_from_this<Device>,
/// MemoryManager instances with non-default parameters.
virtual std::shared_ptr<MemoryManager> default_memory_manager() = 0;
+ /// \brief Return the DeviceAllocationType of this device
+ virtual DeviceAllocationType device_type() const = 0;
+
protected:
ARROW_DISALLOW_COPY_AND_ASSIGN(Device);
explicit Device(bool is_cpu = false) : is_cpu_(is_cpu) {}
@@ -172,6 +199,7 @@ class ARROW_EXPORT CPUDevice : public Device {
const char* type_name() const override;
std::string ToString() const override;
bool Equals(const Device&) const override;
+ DeviceAllocationType device_type() const override { return
DeviceAllocationType::kCPU; }
std::shared_ptr<MemoryManager> default_memory_manager() override;
diff --git a/cpp/src/arrow/gpu/cuda_context.cc
b/cpp/src/arrow/gpu/cuda_context.cc
index f754c07d13..869ea6453c 100644
--- a/cpp/src/arrow/gpu/cuda_context.cc
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -384,7 +384,8 @@ Result<std::shared_ptr<Buffer>>
CudaMemoryManager::ViewBufferTo(
if (to->is_cpu()) {
// Device-on-CPU view
ARROW_ASSIGN_OR_RAISE(auto address, GetHostAddress(buf->address()));
- return std::make_shared<Buffer>(address, buf->size(), to, buf);
+ return std::make_shared<Buffer>(address, buf->size(), to, buf,
+ DeviceAllocationType::kCUDA_HOST);
}
return nullptr;
}
diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
index 0115ed19a1..a1b95c7b41 100644
--- a/cpp/src/arrow/gpu/cuda_context.h
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -92,6 +92,10 @@ class ARROW_EXPORT CudaDevice : public Device {
std::string ToString() const override;
bool Equals(const Device&) const override;
std::shared_ptr<MemoryManager> default_memory_manager() override;
+ DeviceAllocationType device_type() const override {
+ return DeviceAllocationType::kCUDA;
+ }
+ int64_t device_id() const override { return device_number(); }
/// \brief Return a CudaDevice instance for a particular device
/// \param[in] device_number the CUDA device number
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index 297e4dcf71..860c6311d7 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -198,6 +198,11 @@ Result<std::shared_ptr<CudaIpcMemHandle>>
CudaBuffer::ExportForIpc() {
return handle;
}
+CudaHostBuffer::CudaHostBuffer(uint8_t* data, const int64_t size)
+ : MutableBuffer(data, size) {
+ device_type_ = DeviceAllocationType::kCUDA_HOST;
+}
+
CudaHostBuffer::~CudaHostBuffer() {
auto maybe_manager = CudaDeviceManager::Instance();
ARROW_CHECK_OK(maybe_manager.status());
@@ -480,5 +485,21 @@ Result<uint8_t*> GetHostAddress(uintptr_t device_ptr) {
return static_cast<uint8_t*>(ptr);
}
+Result<std::shared_ptr<MemoryManager>> DefaultMemoryMapper(ArrowDeviceType
device_type,
+ int64_t device_id) {
+ switch (device_type) {
+ case ARROW_DEVICE_CPU:
+ return default_cpu_memory_manager();
+ case ARROW_DEVICE_CUDA:
+ case ARROW_DEVICE_CUDA_HOST:
+ case ARROW_DEVICE_CUDA_MANAGED: {
+ ARROW_ASSIGN_OR_RAISE(auto device,
arrow::cuda::CudaDevice::Make(device_id));
+ return device->default_memory_manager();
+ }
+ default:
+ return Status::NotImplemented("memory manager not implemented for
device");
+ }
+}
+
} // namespace cuda
} // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 18c23a5078..d323bef034 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -21,6 +21,7 @@
#include <memory>
#include "arrow/buffer.h"
+#include "arrow/c/abi.h"
#include "arrow/io/concurrency.h"
#include "arrow/type_fwd.h"
@@ -110,7 +111,8 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
/// \brief Device-accessible CPU memory created using cudaHostAlloc
class ARROW_EXPORT CudaHostBuffer : public MutableBuffer {
public:
- using MutableBuffer::MutableBuffer;
+ CudaHostBuffer(uint8_t* data, const int64_t size);
+
~CudaHostBuffer();
/// \brief Return a device address the GPU can read this memory from.
@@ -258,5 +260,9 @@ Result<uintptr_t> GetDeviceAddress(const uint8_t* cpu_data,
ARROW_EXPORT
Result<uint8_t*> GetHostAddress(uintptr_t device_ptr);
+ARROW_EXPORT
+Result<std::shared_ptr<MemoryManager>> DefaultMemoryMapper(ArrowDeviceType
device_type,
+ int64_t device_id);
+
} // namespace cuda
} // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc
index aac45d1383..6d392213e2 100644
--- a/cpp/src/arrow/gpu/cuda_test.cc
+++ b/cpp/src/arrow/gpu/cuda_test.cc
@@ -364,6 +364,7 @@ TEST_F(TestCudaHostBuffer, AllocateGlobal) {
ASSERT_TRUE(host_buffer->is_cpu());
ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_);
+ ASSERT_EQ(host_buffer->device_type(), DeviceAllocationType::kCUDA_HOST);
ASSERT_OK_AND_ASSIGN(auto device_address,
host_buffer->GetDeviceAddress(context_));
ASSERT_NE(device_address, 0);
@@ -376,6 +377,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) {
ASSERT_TRUE(host_buffer->is_cpu());
ASSERT_EQ(host_buffer->memory_manager(), cpu_mm_);
+ ASSERT_EQ(host_buffer->device_type(), DeviceAllocationType::kCUDA_HOST);
// Try to view the host buffer on the device. This should correspond to
// GetDeviceAddress() in the previous test.
@@ -385,6 +387,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) {
ASSERT_NE(device_buffer->address(), 0);
ASSERT_EQ(device_buffer->size(), host_buffer->size());
ASSERT_EQ(device_buffer->parent(), host_buffer);
+ ASSERT_EQ(device_buffer->device_type(), DeviceAllocationType::kCUDA);
// View back the device buffer on the CPU. This should roundtrip.
ASSERT_OK_AND_ASSIGN(auto buffer, Buffer::View(device_buffer, cpu_mm_));
@@ -393,6 +396,7 @@ TEST_F(TestCudaHostBuffer, ViewOnDevice) {
ASSERT_EQ(buffer->address(), host_buffer->address());
ASSERT_EQ(buffer->size(), host_buffer->size());
ASSERT_EQ(buffer->parent(), device_buffer);
+ ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCUDA_HOST);
}
// ------------------------------------------------------------------------