This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 87b515e920 GH-39771: [C++][Device] Generic CopyBatchTo/CopyArrayTo
memory types (#39772)
87b515e920 is described below
commit 87b515e9207509aa3f77e3e1c0122be314a77e6d
Author: Matt Topol <[email protected]>
AuthorDate: Thu Feb 1 11:48:29 2024 -0500
GH-39771: [C++][Device] Generic CopyBatchTo/CopyArrayTo memory types
(#39772)
### Rationale for this change
Right now our MemoryManager interfaces operate solely at the buffer level
and we do not provide any higher level facilities to copy an entire array or
record batch between memory types. We should implement CopyArrayTo and
CopyBatchTo functions which recursively utilize the buffer level copying to
create a new Array/RecordBatch whose buffers have been copied to the
destination memory manager.
### What changes are included in this PR?
Exposing a `CopyArrayTo` and `CopyBatchTo` function for copying entire
Array or RecordBatches between memory types.
### Are these changes tested?
Tests are still being written but will be added.
* Closes: #39771
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
cpp/src/arrow/array/array_base.cc | 12 +++++++++++
cpp/src/arrow/array/array_base.h | 16 +++++++++++++++
cpp/src/arrow/array/data.cc | 39 ++++++++++++++++++++++++++++++++++++
cpp/src/arrow/array/data.h | 19 +++++++++++++++---
cpp/src/arrow/buffer.h | 2 +-
cpp/src/arrow/c/bridge.cc | 2 +-
cpp/src/arrow/c/bridge_test.cc | 4 +++-
cpp/src/arrow/device.cc | 2 ++
cpp/src/arrow/gpu/cuda_context.cc | 5 +++++
cpp/src/arrow/ipc/read_write_test.cc | 27 ++++---------------------
cpp/src/arrow/record_batch.cc | 24 ++++++++++++++++++++++
cpp/src/arrow/record_batch.h | 19 ++++++++++++++++++
12 files changed, 142 insertions(+), 29 deletions(-)
diff --git a/cpp/src/arrow/array/array_base.cc
b/cpp/src/arrow/array/array_base.cc
index b483ec420c..6927f51283 100644
--- a/cpp/src/arrow/array/array_base.cc
+++ b/cpp/src/arrow/array/array_base.cc
@@ -307,6 +307,18 @@ Result<std::shared_ptr<Array>> Array::View(
return MakeArray(result);
}
+Result<std::shared_ptr<Array>> Array::CopyTo(
+ const std::shared_ptr<MemoryManager>& to) const {
+ ARROW_ASSIGN_OR_RAISE(auto copied_data, data()->CopyTo(to));
+ return MakeArray(copied_data);
+}
+
+Result<std::shared_ptr<Array>> Array::ViewOrCopyTo(
+ const std::shared_ptr<MemoryManager>& to) const {
+ ARROW_ASSIGN_OR_RAISE(auto new_data, data()->ViewOrCopyTo(to));
+ return MakeArray(new_data);
+}
+
// ----------------------------------------------------------------------
// NullArray
diff --git a/cpp/src/arrow/array/array_base.h b/cpp/src/arrow/array/array_base.h
index 7e857bf205..6411aebf80 100644
--- a/cpp/src/arrow/array/array_base.h
+++ b/cpp/src/arrow/array/array_base.h
@@ -165,6 +165,22 @@ class ARROW_EXPORT Array {
/// An error is returned if the types are not layout-compatible.
Result<std::shared_ptr<Array>> View(const std::shared_ptr<DataType>& type)
const;
+ /// \brief Construct a copy of the array with all buffers on destination
+ /// Memory Manager
+ ///
+ /// This method recursively copies the array's buffers and those of its
children
+ /// onto the destination MemoryManager device and returns the new Array.
+ Result<std::shared_ptr<Array>> CopyTo(const std::shared_ptr<MemoryManager>&
to) const;
+
+ /// \brief Construct a new array attempting to zero-copy view if possible.
+ ///
+ /// Like CopyTo this method recursively goes through all of the array's
buffers
+ /// and those of it's children and first attempts to create zero-copy
+ /// views on the destination MemoryManager device. If it can't, it falls back
+ /// to performing a copy. See Buffer::ViewOrCopy.
+ Result<std::shared_ptr<Array>> ViewOrCopyTo(
+ const std::shared_ptr<MemoryManager>& to) const;
+
/// Construct a zero-copy slice of the array with the indicated offset and
/// length
///
diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc
index 8454ac8f1d..80c411dfa6 100644
--- a/cpp/src/arrow/array/data.cc
+++ b/cpp/src/arrow/array/data.cc
@@ -27,6 +27,7 @@
#include "arrow/array/util.h"
#include "arrow/buffer.h"
+#include "arrow/device.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/type.h"
@@ -36,6 +37,7 @@
#include "arrow/util/dict_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
+#include "arrow/util/range.h"
#include "arrow/util/ree_util.h"
#include "arrow/util/slice_util_internal.h"
#include "arrow/util/union_util.h"
@@ -140,6 +142,43 @@ std::shared_ptr<ArrayData>
ArrayData::Make(std::shared_ptr<DataType> type, int64
return std::make_shared<ArrayData>(std::move(type), length, null_count,
offset);
}
+namespace {
+template <typename Fn>
+Result<std::shared_ptr<ArrayData>> CopyToImpl(const ArrayData& data,
+ const
std::shared_ptr<MemoryManager>& to,
+ Fn&& copy_fn) {
+ auto output = ArrayData::Make(data.type, data.length, data.null_count,
data.offset);
+ output->buffers.resize(data.buffers.size());
+ for (auto&& [buf, out_buf] : internal::Zip(data.buffers, output->buffers)) {
+ if (buf) {
+ ARROW_ASSIGN_OR_RAISE(out_buf, copy_fn(buf, to));
+ }
+ }
+
+ output->child_data.reserve(data.child_data.size());
+ for (const auto& child : data.child_data) {
+ ARROW_ASSIGN_OR_RAISE(auto copied, CopyToImpl(*child, to, copy_fn));
+ output->child_data.push_back(std::move(copied));
+ }
+
+ if (data.dictionary) {
+ ARROW_ASSIGN_OR_RAISE(output->dictionary, CopyToImpl(*data.dictionary, to,
copy_fn));
+ }
+
+ return output;
+}
+} // namespace
+
+Result<std::shared_ptr<ArrayData>> ArrayData::CopyTo(
+ const std::shared_ptr<MemoryManager>& to) const {
+ return CopyToImpl(*this, to, MemoryManager::CopyBuffer);
+}
+
+Result<std::shared_ptr<ArrayData>> ArrayData::ViewOrCopyTo(
+ const std::shared_ptr<MemoryManager>& to) const {
+ return CopyToImpl(*this, to, Buffer::ViewOrCopy);
+}
+
std::shared_ptr<ArrayData> ArrayData::Slice(int64_t off, int64_t len) const {
ARROW_CHECK_LE(off, length) << "Slice offset (" << off
<< ") greater than array length (" << length <<
")";
diff --git a/cpp/src/arrow/array/data.h b/cpp/src/arrow/array/data.h
index edd443adc4..d8a6663cec 100644
--- a/cpp/src/arrow/array/data.h
+++ b/cpp/src/arrow/array/data.h
@@ -27,6 +27,7 @@
#include "arrow/buffer.h"
#include "arrow/result.h"
#include "arrow/type.h"
+#include "arrow/type_fwd.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/macros.h"
#include "arrow/util/span.h"
@@ -34,9 +35,6 @@
namespace arrow {
-class Array;
-struct ArrayData;
-
namespace internal {
// ----------------------------------------------------------------------
// Null handling for types without a validity bitmap and the dictionary type
@@ -183,6 +181,21 @@ struct ARROW_EXPORT ArrayData {
std::shared_ptr<ArrayData> Copy() const { return
std::make_shared<ArrayData>(*this); }
+ /// \brief Copy all buffers and children recursively to destination
MemoryManager
+ ///
+ /// This utilizes MemoryManager::CopyBuffer to create a new ArrayData object
+ /// recursively copying the buffers and all child buffers to the destination
+ /// memory manager. This includes dictionaries if applicable.
+ Result<std::shared_ptr<ArrayData>> CopyTo(
+ const std::shared_ptr<MemoryManager>& to) const;
+ /// \brief View or Copy this ArrayData to destination memory manager.
+ ///
+ /// Tries to view the buffer contents on the given memory manager's device
+ /// if possible (to avoid a copy) but falls back to copying if a no-copy view
+ /// isn't supported.
+ Result<std::shared_ptr<ArrayData>> ViewOrCopyTo(
+ const std::shared_ptr<MemoryManager>& to) const;
+
bool IsNull(int64_t i) const { return !IsValid(i); }
bool IsValid(int64_t i) const {
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 52fd94ec1f..258a9faac7 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -359,7 +359,7 @@ class ARROW_EXPORT Buffer {
static Result<std::shared_ptr<Buffer>> ViewOrCopy(
std::shared_ptr<Buffer> source, const std::shared_ptr<MemoryManager>&
to);
- virtual std::shared_ptr<Device::SyncEvent> device_sync_event() { return
NULLPTR; }
+ virtual std::shared_ptr<Device::SyncEvent> device_sync_event() const {
return NULLPTR; }
protected:
bool is_mutable_;
diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc
index 238afb0328..172ed8962c 100644
--- a/cpp/src/arrow/c/bridge.cc
+++ b/cpp/src/arrow/c/bridge.cc
@@ -1466,7 +1466,7 @@ class ImportedBuffer : public Buffer {
~ImportedBuffer() override = default;
- std::shared_ptr<Device::SyncEvent> device_sync_event() override {
+ std::shared_ptr<Device::SyncEvent> device_sync_event() const override {
return import_->device_sync_;
}
diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc
index 58bbc9282c..321ec36c38 100644
--- a/cpp/src/arrow/c/bridge_test.cc
+++ b/cpp/src/arrow/c/bridge_test.cc
@@ -1282,7 +1282,9 @@ class MyBuffer final : public MutableBuffer {
default_memory_pool()->Free(const_cast<uint8_t*>(data_), size_);
}
- std::shared_ptr<Device::SyncEvent> device_sync_event() override { return
device_sync_; }
+ std::shared_ptr<Device::SyncEvent> device_sync_event() const override {
+ return device_sync_;
+ }
protected:
std::shared_ptr<Device::SyncEvent> device_sync_;
diff --git a/cpp/src/arrow/device.cc b/cpp/src/arrow/device.cc
index de709923dc..616f89aae8 100644
--- a/cpp/src/arrow/device.cc
+++ b/cpp/src/arrow/device.cc
@@ -20,8 +20,10 @@
#include <cstring>
#include <utility>
+#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/io/memory.h"
+#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/logging.h"
diff --git a/cpp/src/arrow/gpu/cuda_context.cc
b/cpp/src/arrow/gpu/cuda_context.cc
index 81542d339b..988cc1f25b 100644
--- a/cpp/src/arrow/gpu/cuda_context.cc
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -433,6 +433,11 @@ Result<std::shared_ptr<Buffer>>
CudaMemoryManager::CopyBufferTo(
Result<std::unique_ptr<Buffer>> CudaMemoryManager::CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) {
if (to->is_cpu()) {
+ auto sync_event = buf.device_sync_event();
+ if (sync_event) {
+ RETURN_NOT_OK(sync_event->Wait());
+ }
+
// Device-to-CPU copy
std::unique_ptr<Buffer> dest;
ARROW_ASSIGN_OR_RAISE(auto from_context, cuda_device()->GetContext());
diff --git a/cpp/src/arrow/ipc/read_write_test.cc
b/cpp/src/arrow/ipc/read_write_test.cc
index bd2c2b716d..c5075299a3 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -1336,30 +1336,11 @@ class CopyCollectListener : public CollectListener {
Status OnRecordBatchWithMetadataDecoded(
RecordBatchWithMetadata record_batch_with_metadata) override {
- auto& record_batch = record_batch_with_metadata.batch;
- for (auto column_data : record_batch->column_data()) {
- ARROW_RETURN_NOT_OK(CopyArrayData(column_data));
- }
- return
CollectListener::OnRecordBatchWithMetadataDecoded(record_batch_with_metadata);
- }
+ ARROW_ASSIGN_OR_RAISE(
+ record_batch_with_metadata.batch,
+
record_batch_with_metadata.batch->CopyTo(default_cpu_memory_manager()));
- private:
- Status CopyArrayData(std::shared_ptr<ArrayData> data) {
- auto& buffers = data->buffers;
- for (size_t i = 0; i < buffers.size(); ++i) {
- auto& buffer = buffers[i];
- if (!buffer) {
- continue;
- }
- ARROW_ASSIGN_OR_RAISE(buffers[i], Buffer::Copy(buffer,
buffer->memory_manager()));
- }
- for (auto child_data : data->child_data) {
- ARROW_RETURN_NOT_OK(CopyArrayData(child_data));
- }
- if (data->dictionary) {
- ARROW_RETURN_NOT_OK(CopyArrayData(data->dictionary));
- }
- return Status::OK();
+ return
CollectListener::OnRecordBatchWithMetadataDecoded(record_batch_with_metadata);
}
};
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index 457135fa40..ca6b45af3d 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -357,6 +357,30 @@ Status ValidateBatch(const RecordBatch& batch, bool
full_validation) {
} // namespace
+Result<std::shared_ptr<RecordBatch>> RecordBatch::CopyTo(
+ const std::shared_ptr<MemoryManager>& to) const {
+ ArrayVector copied_columns;
+ copied_columns.reserve(num_columns());
+ for (const auto& col : columns()) {
+ ARROW_ASSIGN_OR_RAISE(auto c, col->CopyTo(to));
+ copied_columns.push_back(std::move(c));
+ }
+
+ return Make(schema_, num_rows(), std::move(copied_columns));
+}
+
+Result<std::shared_ptr<RecordBatch>> RecordBatch::ViewOrCopyTo(
+ const std::shared_ptr<MemoryManager>& to) const {
+ ArrayVector copied_columns;
+ copied_columns.reserve(num_columns());
+ for (const auto& col : columns()) {
+ ARROW_ASSIGN_OR_RAISE(auto c, col->ViewOrCopyTo(to));
+ copied_columns.push_back(std::move(c));
+ }
+
+ return Make(schema_, num_rows(), std::move(copied_columns));
+}
+
Status RecordBatch::Validate() const {
return ValidateBatch(*this, /*full_validation=*/false);
}
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index 1a66fc3fb5..79f93a7b59 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -186,6 +186,25 @@ class ARROW_EXPORT RecordBatch {
/// \return the number of rows (the corresponding length of each column)
int64_t num_rows() const { return num_rows_; }
+ /// \brief Copy the entire RecordBatch to destination MemoryManager
+ ///
+ /// This uses Array::CopyTo on each column of the record batch to create
+ /// a new record batch where all underlying buffers for the columns have
+ /// been copied to the destination MemoryManager. This uses
+ /// MemoryManager::CopyBuffer under the hood.
+ Result<std::shared_ptr<RecordBatch>> CopyTo(
+ const std::shared_ptr<MemoryManager>& to) const;
+
+ /// \brief View or Copy the entire RecordBatch to destination MemoryManager
+ ///
+ /// This uses Array::ViewOrCopyTo on each column of the record batch to
create
+ /// a new record batch where all underlying buffers for the columns have
+ /// been zero-copy viewed on the destination MemoryManager, falling back
+ /// to performing a copy if it can't be viewed as a zero-copy buffer. This
uses
+ /// Buffer::ViewOrCopy under the hood.
+ Result<std::shared_ptr<RecordBatch>> ViewOrCopyTo(
+ const std::shared_ptr<MemoryManager>& to) const;
+
/// \brief Slice each of the arrays in the record batch
/// \param[in] offset the starting offset to slice, through end of batch
/// \return new record batch