This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 761f1c58a3 [GLUTEN-7800][VL] Add config for max reclaim wait time to
avoid dead lock when memory arbitration
761f1c58a3 is described below
commit 761f1c58a3adb6d768680523431ccb67b459f6c5
Author: Yang Zhang <[email protected]>
AuthorDate: Thu Nov 7 08:32:05 2024 +0800
[GLUTEN-7800][VL] Add config for max reclaim wait time to avoid dead lock
when memory arbitration
---
cpp/velox/config/VeloxConfig.h | 3 ++
cpp/velox/jni/VeloxJniWrapper.cc | 1 -
cpp/velox/memory/VeloxMemoryManager.cc | 42 ++++++++++++----------
cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 1 -
.../scala/org/apache/gluten/GlutenConfig.scala | 7 ++++
5 files changed, 33 insertions(+), 21 deletions(-)
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 792beda96f..cb70dc6278 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -76,6 +76,9 @@ const bool kMemoryUseHugePagesDefault = false;
const std::string kVeloxMemInitCapacity =
"spark.gluten.sql.columnar.backend.velox.memInitCapacity";
const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;
+const std::string kVeloxMemReclaimMaxWaitMs =
"spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs";
+const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min
+
const std::string kHiveConnectorId = "test-hive";
const std::string kVeloxCacheEnabled =
"spark.gluten.sql.columnar.backend.velox.cacheEnabled";
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index b8d2b0c3c2..6ea60d651a 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -385,7 +385,6 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapp
jlong dsHandle,
jlong batchHandle) {
JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
datasource->write(batch);
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 19a5d45804..55420d9380 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -22,6 +22,7 @@
#include "velox/common/memory/MallocAllocator.h"
#include "velox/common/memory/MemoryPool.h"
+#include "velox/common/memory/SharedArbitrator.h"
#include "velox/exec/MemoryReclaimer.h"
#include "compute/VeloxBackend.h"
@@ -36,12 +37,6 @@ namespace gluten {
using namespace facebook;
namespace {
-
-static constexpr std::string_view
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
-static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
-static constexpr std::string_view
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
-static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
-
template <typename T>
T getConfig(
const std::unordered_map<std::string, std::string>& configs,
@@ -57,24 +52,28 @@ T getConfig(
return defaultValue;
}
} // namespace
+
/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
public:
ListenableArbitrator(const Config& config, AllocationListener* listener)
: MemoryArbitrator(config),
listener_(listener),
- memoryPoolInitialCapacity_(velox::config::toCapacity(
- getConfig<std::string>(
- config.extraConfigs,
- kMemoryPoolInitialCapacity,
- std::to_string(kDefaultMemoryPoolInitialCapacity)),
- velox::config::CapacityUnit::BYTE)),
+ reclaimMaxWaitMs_(
+
velox::memory::SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(config.extraConfigs)),
+ memoryPoolInitialCapacity_(
+
velox::memory::SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(config.extraConfigs)),
memoryPoolTransferCapacity_(velox::config::toCapacity(
getConfig<std::string>(
config.extraConfigs,
- kMemoryPoolTransferCapacity,
- std::to_string(kDefaultMemoryPoolTransferCapacity)),
- velox::config::CapacityUnit::BYTE)) {}
+ kMemoryReservationBlockSize,
+ std::to_string(kMemoryReservationBlockSizeDefault)),
+ velox::config::CapacityUnit::BYTE)) {
+ if (reclaimMaxWaitMs_ == 0) {
+ LOG(WARNING) << kVeloxMemReclaimMaxWaitMs
+ << " was set to 0, it may cause dead lock when memory
arbitration has bug.";
+ }
+ }
std::string kind() const override {
return kind_;
}
@@ -121,7 +120,7 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only
be used within a single root pool");
pool = candidates_.begin()->first;
}
- pool->reclaim(targetBytes, 0, status); // ignore the output
+ pool->reclaim(targetBytes, reclaimMaxWaitMs_, status); // ignore the output
return shrinkCapacity0(pool, 0);
}
@@ -168,6 +167,7 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
}
gluten::AllocationListener* listener_;
+ const uint64_t reclaimMaxWaitMs_;
const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
const uint64_t memoryPoolTransferCapacity_;
@@ -208,14 +208,18 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string&
kind, std::unique_ptr<
kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
auto memInitCapacity =
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity,
kVeloxMemInitCapacityDefault);
+ auto memReclaimMaxWaitMs =
+
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs,
kVeloxMemReclaimMaxWaitMsDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(),
reservationBlockSize);
listenableAlloc_ =
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
blockListener_.get());
arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
std::unordered_map<std::string, std::string> extraArbitratorConfigs;
- extraArbitratorConfigs["memory-pool-initial-capacity"] =
folly::to<std::string>(memInitCapacity) + "B";
- extraArbitratorConfigs["memory-pool-transfer-capacity"] =
folly::to<std::string>(reservationBlockSize) + "B";
- extraArbitratorConfigs["memory-reclaim-max-wait-time"] =
folly::to<std::string>(0) + "ms";
+
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)]
=
+ folly::to<std::string>(memInitCapacity) + "B";
+ extraArbitratorConfigs[kMemoryReservationBlockSize] =
folly::to<std::string>(reservationBlockSize) + "B";
+
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)]
=
+ folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
ArbitratorFactoryRegister afr(listener_.get());
velox::memory::MemoryManagerOptions mmOptions{
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index 34796e378e..e17ad5e2f7 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -117,7 +117,6 @@ arrow::Status
VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb
}
arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
- int64_t rawSize = batch_->size();
bufferOutputStream_->seekp(0);
batch_->flush(bufferOutputStream_.get());
auto buffer = bufferOutputStream_->getBuffer();
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index a28a7d26b3..13bf72d47e 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1414,6 +1414,13 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8MB")
+ val COLUMNAR_VELOX_MEM_RECLAIM_MAX_WAIT_MS =
+ buildConf("spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs")
+ .internal()
+ .doc("The max time in ms to wait for memory reclaim.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(TimeUnit.MINUTES.toMillis(60))
+
val COLUMNAR_VELOX_SSD_CACHE_PATH =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]