This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 2132cb361c GH-39270: [C++] Avoid creating memory manager instance for
every buffer view/copy (#39271)
2132cb361c is described below
commit 2132cb361c386ab25d72b4990772ca6e5c31312c
Author: Sutou Kouhei <[email protected]>
AuthorDate: Thu Jan 11 22:45:22 2024 +0900
GH-39270: [C++] Avoid creating memory manager instance for every buffer
view/copy (#39271)
### Rationale for this change
We can use `arrow::default_cpu_memory_manager()` for
`default_memory_pool()`.
### What changes are included in this PR?
Check the given `pool` and use `arrow::default_cpu_memory_manager()` if
it's `arrow::default_memory_pool()`.
This also caches `arrow::CPUDevice::memory_manager()` result to avoid
calling it multiple times. Note that we can avoid creating needless memory
manager instance without this. This just avoid calling it multiple times.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
No.
* Closes: #39270
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/device.cc | 6 +++++-
cpp/src/arrow/ipc/message.cc | 20 +++++++++-----------
2 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/cpp/src/arrow/device.cc b/cpp/src/arrow/device.cc
index 14d3bac0af..de709923dc 100644
--- a/cpp/src/arrow/device.cc
+++ b/cpp/src/arrow/device.cc
@@ -241,7 +241,11 @@ bool CPUDevice::Equals(const Device& other) const {
}
std::shared_ptr<MemoryManager> CPUDevice::memory_manager(MemoryPool* pool) {
- return CPUMemoryManager::Make(Instance(), pool);
+ if (pool == default_memory_pool()) {
+ return default_cpu_memory_manager();
+ } else {
+ return CPUMemoryManager::Make(Instance(), pool);
+ }
}
std::shared_ptr<MemoryManager> CPUDevice::default_memory_manager() {
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index fbcd6f139b..e196dd7bf5 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -607,6 +607,7 @@ class MessageDecoder::MessageDecoderImpl {
MemoryPool* pool, bool skip_body)
: listener_(std::move(listener)),
pool_(pool),
+ memory_manager_(CPUDevice::memory_manager(pool_)),
state_(initial_state),
next_required_size_(initial_next_required_size),
chunks_(),
@@ -822,8 +823,7 @@ class MessageDecoder::MessageDecoderImpl {
if (buffer->is_cpu()) {
metadata_ = buffer;
} else {
- ARROW_ASSIGN_OR_RAISE(metadata_,
- Buffer::ViewOrCopy(buffer,
CPUDevice::memory_manager(pool_)));
+ ARROW_ASSIGN_OR_RAISE(metadata_, Buffer::ViewOrCopy(buffer,
memory_manager_));
}
return ConsumeMetadata();
}
@@ -834,16 +834,15 @@ class MessageDecoder::MessageDecoderImpl {
if (chunks_[0]->is_cpu()) {
metadata_ = std::move(chunks_[0]);
} else {
- ARROW_ASSIGN_OR_RAISE(
- metadata_,
- Buffer::ViewOrCopy(chunks_[0],
CPUDevice::memory_manager(pool_)));
+ ARROW_ASSIGN_OR_RAISE(metadata_,
+ Buffer::ViewOrCopy(chunks_[0],
memory_manager_));
}
chunks_.erase(chunks_.begin());
} else {
metadata_ = SliceBuffer(chunks_[0], 0, next_required_size_);
if (!chunks_[0]->is_cpu()) {
- ARROW_ASSIGN_OR_RAISE(
- metadata_, Buffer::ViewOrCopy(metadata_,
CPUDevice::memory_manager(pool_)));
+ ARROW_ASSIGN_OR_RAISE(metadata_,
+ Buffer::ViewOrCopy(metadata_,
memory_manager_));
}
chunks_[0] = SliceBuffer(chunks_[0], next_required_size_);
}
@@ -911,8 +910,7 @@ class MessageDecoder::MessageDecoderImpl {
if (buffer->is_cpu()) {
return util::SafeLoadAs<int32_t>(buffer->data());
} else {
- ARROW_ASSIGN_OR_RAISE(auto cpu_buffer,
- Buffer::ViewOrCopy(buffer,
CPUDevice::memory_manager(pool_)));
+ ARROW_ASSIGN_OR_RAISE(auto cpu_buffer, Buffer::ViewOrCopy(buffer,
memory_manager_));
return util::SafeLoadAs<int32_t>(cpu_buffer->data());
}
}
@@ -924,8 +922,7 @@ class MessageDecoder::MessageDecoderImpl {
std::shared_ptr<Buffer> last_chunk;
for (auto& chunk : chunks_) {
if (!chunk->is_cpu()) {
- ARROW_ASSIGN_OR_RAISE(
- chunk, Buffer::ViewOrCopy(chunk,
CPUDevice::memory_manager(pool_)));
+ ARROW_ASSIGN_OR_RAISE(chunk, Buffer::ViewOrCopy(chunk,
memory_manager_));
}
auto data = chunk->data();
auto data_size = chunk->size();
@@ -951,6 +948,7 @@ class MessageDecoder::MessageDecoderImpl {
std::shared_ptr<MessageDecoderListener> listener_;
MemoryPool* pool_;
+ std::shared_ptr<MemoryManager> memory_manager_;
State state_;
int64_t next_required_size_;
std::vector<std::shared_ptr<Buffer>> chunks_;