This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ec62a76c8 [VL] Daily update velox version 08-08 (#6752)
ec62a76c8 is described below
commit ec62a76c88b264262795577fc60fa34f328bb33e
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Aug 8 22:40:37 2024 +0800
[VL] Daily update velox version 08-08 (#6752)
---
cpp/velox/compute/WholeStageResultIterator.cc | 2 +-
cpp/velox/memory/VeloxMemoryManager.cc | 88 ++++++++++++++++++---------
ep/build-velox/src/get_velox.sh | 2 +-
3 files changed, 62 insertions(+), 30 deletions(-)
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 4544c0165..eb700c648 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -245,7 +245,7 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t
size) {
SuspendedSection suspender;
velox::exec::MemoryReclaimer::Stats status;
auto* mm = memoryManager_->getMemoryManager();
- uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining);
// this conducts spilling
+ uint64_t spilledOut = mm->arbitrator()->shrinkCapacity(remaining); // this
conducts spilling
LOG(INFO) << logPrefix << "Successfully spilled out " << spilledOut << "
bytes.";
uint64_t total = shrunken + spilledOut;
VLOG(2) << logPrefix << "Successfully reclaimed total " << total << "
bytes.";
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 442090004..3f30d8627 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -35,63 +35,91 @@ namespace gluten {
using namespace facebook;
+namespace {
+
+static constexpr std::string_view
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
+static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
+static constexpr std::string_view
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
+static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
+
+template <typename T>
+T getConfig(
+ const std::unordered_map<std::string, std::string>& configs,
+ const std::string_view& key,
+ const T& defaultValue) {
+ if (configs.count(std::string(key)) > 0) {
+ try {
+ return folly::to<T>(configs.at(std::string(key)));
+ } catch (const std::exception& e) {
+ VELOX_USER_FAIL("Failed while parsing SharedArbitrator configs: {}",
e.what());
+ }
+ }
+ return defaultValue;
+}
+} // namespace
/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
public:
ListenableArbitrator(const Config& config, AllocationListener* listener)
- : MemoryArbitrator(config), listener_(listener) {}
-
+ : MemoryArbitrator(config),
+ listener_(listener),
+ memoryPoolInitialCapacity_(
+ getConfig<uint64_t>(config.extraConfigs,
kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity)),
+ memoryPoolTransferCapacity_(
+ getConfig<uint64_t>(config.extraConfigs,
kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity)) {
+ }
std::string kind() const override {
return kind_;
}
- uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes)
override {
- std::lock_guard<std::recursive_mutex> l(mutex_);
- listener_->allocationChanged(targetBytes);
- if (!growPool(pool, targetBytes, 0)) {
- VELOX_FAIL("Failed to grow root pool's capacity for {}",
velox::succinctBytes(targetBytes));
- }
- return targetBytes;
+ void addPool(const std::shared_ptr<velox::memory::MemoryPool>& pool)
override {
+ VELOX_CHECK_EQ(pool->capacity(), 0);
+
+ std::unique_lock guard{mutex_};
+ VELOX_CHECK_EQ(candidates_.count(pool.get()), 0);
+ candidates_.emplace(pool.get(), pool->weak_from_this());
}
- uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t
targetBytes) override {
- std::lock_guard<std::recursive_mutex> l(mutex_);
- return shrinkCapacityLocked(pool, targetBytes);
+ void removePool(velox::memory::MemoryPool* pool) override {
+ VELOX_CHECK_EQ(pool->reservedBytes(), 0);
+ shrinkCapacity(pool, pool->capacity());
+
+ std::unique_lock guard{mutex_};
+ const auto ret = candidates_.erase(pool);
+ VELOX_CHECK_EQ(ret, 1);
}
- bool growCapacity(
- velox::memory::MemoryPool* pool,
- const std::vector<std::shared_ptr<velox::memory::MemoryPool>>&
candidatePools,
- uint64_t targetBytes) override {
+ bool growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes)
override {
velox::memory::ScopedMemoryArbitrationContext ctx(pool);
- VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only
be used within a single root pool")
- auto candidate = candidatePools.back();
- VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in
ListenableArbitrator");
+ VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be
used within a single root pool")
+ auto candidate = candidates_.begin()->first;
+ VELOX_CHECK(pool->root() == candidate, "Illegal state in
ListenableArbitrator");
std::lock_guard<std::recursive_mutex> l(mutex_);
growCapacityLocked(pool->root(), targetBytes);
return true;
}
- uint64_t shrinkCapacity(
- const std::vector<std::shared_ptr<velox::memory::MemoryPool>>& pools,
- uint64_t targetBytes,
- bool allowSpill,
- bool allowAbort) override {
+ uint64_t shrinkCapacity(uint64_t targetBytes, bool allowSpill, bool
allowAbort) override {
velox::memory::ScopedMemoryArbitrationContext ctx((const
velox::memory::MemoryPool*)nullptr);
facebook::velox::exec::MemoryReclaimer::Stats status;
- VELOX_CHECK_EQ(pools.size(), 1, "Gluten only has one root pool");
+ VELOX_CHECK_EQ(candidates_.size(), 1, "Gluten only has one root pool");
std::lock_guard<std::recursive_mutex> l(mutex_); // FIXME: Do we have
recursive locking for this mutex?
- auto pool = pools.at(0);
+ auto pool = candidates_.begin()->first;
const uint64_t oldCapacity = pool->capacity();
pool->reclaim(targetBytes, 0, status); // ignore the output
- shrinkPool(pool.get(), 0);
+ shrinkPool(pool, 0);
const uint64_t newCapacity = pool->capacity();
uint64_t total = oldCapacity - newCapacity;
listener_->allocationChanged(-total);
return total;
}
+ uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t
targetBytes) override {
+ std::lock_guard<std::recursive_mutex> l(mutex_);
+ return shrinkCapacityLocked(pool, targetBytes);
+ }
+
Stats stats() const override {
Stats stats; // no-op
return stats;
@@ -131,8 +159,12 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
}
gluten::AllocationListener* listener_;
- std::recursive_mutex mutex_;
+ const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
+ const uint64_t memoryPoolTransferCapacity_;
+
+ mutable std::recursive_mutex mutex_;
inline static std::string kind_ = "GLUTEN";
+ std::unordered_map<velox::memory::MemoryPool*,
std::weak_ptr<velox::memory::MemoryPool>> candidates_;
};
class ArbitratorFactoryRegister {
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index 60d87bfb1..79a1d84fa 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -17,7 +17,7 @@
set -exu
VELOX_REPO=https://github.com/oap-project/velox.git
-VELOX_BRANCH=2024_08_06
+VELOX_BRANCH=2024_08_08
VELOX_HOME=""
OS=`uname -s`
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]