This is an automated email from the ASF dual-hosted git repository. yangzy pushed a commit to branch add-maxwaittime-for-memory-arbitration in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit 595192e4199e00d6250fc0ac8cf8e0b463f708b5 Author: Yang Zhang <[email protected]> AuthorDate: Mon Nov 4 16:10:04 2024 +0800 fix --- cpp/velox/config/VeloxConfig.h | 3 ++ cpp/velox/jni/VeloxJniWrapper.cc | 1 - cpp/velox/memory/VeloxMemoryManager.cc | 42 ++++++++++++---------- cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 1 - .../scala/org/apache/gluten/GlutenConfig.scala | 7 ++++ 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 792beda96f..cb70dc6278 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -76,6 +76,9 @@ const bool kMemoryUseHugePagesDefault = false; const std::string kVeloxMemInitCapacity = "spark.gluten.sql.columnar.backend.velox.memInitCapacity"; const uint64_t kVeloxMemInitCapacityDefault = 8 << 20; +const std::string kVeloxMemReclaimMaxWaitMs = "spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs"; +const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min + const std::string kHiveConnectorId = "test-hive"; const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled"; diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index b8d2b0c3c2..6ea60d651a 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -385,7 +385,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapp jlong dsHandle, jlong batchHandle) { JNI_METHOD_START - auto ctx = gluten::getRuntime(env, wrapper); auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle); auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle); datasource->write(batch); diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index 19a5d45804..55420d9380 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -22,6 +22,7 @@ #include "velox/common/memory/MallocAllocator.h" #include "velox/common/memory/MemoryPool.h" +#include "velox/common/memory/SharedArbitrator.h" #include "velox/exec/MemoryReclaimer.h" #include "compute/VeloxBackend.h" @@ -36,12 +37,6 @@ namespace gluten { using namespace facebook; namespace { - -static constexpr std::string_view kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"}; -static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20}; -static constexpr std::string_view kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"}; -static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20}; - template <typename T> T getConfig( const std::unordered_map<std::string, std::string>& configs, @@ -57,24 +52,28 @@ T getConfig( return defaultValue; } } // namespace + /// We assume in a single Spark task. No thread-safety should be guaranteed. class ListenableArbitrator : public velox::memory::MemoryArbitrator { public: ListenableArbitrator(const Config& config, AllocationListener* listener) : MemoryArbitrator(config), listener_(listener), - memoryPoolInitialCapacity_(velox::config::toCapacity( - getConfig<std::string>( - config.extraConfigs, - kMemoryPoolInitialCapacity, - std::to_string(kDefaultMemoryPoolInitialCapacity)), - velox::config::CapacityUnit::BYTE)), + reclaimMaxWaitMs_( + velox::memory::SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(config.extraConfigs)), + memoryPoolInitialCapacity_( + velox::memory::SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(config.extraConfigs)), memoryPoolTransferCapacity_(velox::config::toCapacity( getConfig<std::string>( config.extraConfigs, - kMemoryPoolTransferCapacity, - std::to_string(kDefaultMemoryPoolTransferCapacity)), - velox::config::CapacityUnit::BYTE)) {} + kMemoryReservationBlockSize, + std::to_string(kMemoryReservationBlockSizeDefault)), + velox::config::CapacityUnit::BYTE)) { + if (reclaimMaxWaitMs_ == 0) { + LOG(WARNING) << kVeloxMemReclaimMaxWaitMs + << " was set to 0, it may cause dead lock when memory arbitration has bug."; + } + } std::string kind() const override { return kind_; } @@ -121,7 +120,7 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool"); pool = candidates_.begin()->first; } - pool->reclaim(targetBytes, 0, status); // ignore the output + pool->reclaim(targetBytes, reclaimMaxWaitMs_, status); // ignore the output return shrinkCapacity0(pool, 0); } @@ -168,6 +167,7 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { } gluten::AllocationListener* listener_; + const uint64_t reclaimMaxWaitMs_; const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused. const uint64_t memoryPoolTransferCapacity_; @@ -208,14 +208,18 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string& kind, std::unique_ptr< kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault); auto memInitCapacity = VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, kVeloxMemInitCapacityDefault); + auto memReclaimMaxWaitMs = + VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs, kVeloxMemReclaimMaxWaitMsDefault); blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), reservationBlockSize); listenableAlloc_ = std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), blockListener_.get()); arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get()); std::unordered_map<std::string, std::string> extraArbitratorConfigs; - extraArbitratorConfigs["memory-pool-initial-capacity"] = folly::to<std::string>(memInitCapacity) + "B"; - extraArbitratorConfigs["memory-pool-transfer-capacity"] = folly::to<std::string>(reservationBlockSize) + "B"; - extraArbitratorConfigs["memory-reclaim-max-wait-time"] = folly::to<std::string>(0) + "ms"; + extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = + folly::to<std::string>(memInitCapacity) + "B"; + extraArbitratorConfigs[kMemoryReservationBlockSize] = folly::to<std::string>(reservationBlockSize) + "B"; + extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = + folly::to<std::string>(memReclaimMaxWaitMs) + "ms"; ArbitratorFactoryRegister afr(listener_.get()); velox::memory::MemoryManagerOptions mmOptions{ diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc index 34796e378e..e17ad5e2f7 100644 --- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc @@ -117,7 +117,6 @@ arrow::Status VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb } arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) { - int64_t rawSize = batch_->size(); bufferOutputStream_->seekp(0); batch_->flush(bufferOutputStream_.get()); auto buffer = bufferOutputStream_->getBuffer(); diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index a28a7d26b3..13bf72d47e 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -1414,6 +1414,13 @@ object GlutenConfig { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("8MB") + val COLUMNAR_VELOX_MEM_RECLAIM_MAX_WAIT_MS = + buildConf("spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs") + .internal() + .doc("The max time in ms to wait for memory reclaim.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(TimeUnit.MINUTES.toMillis(60)) + val COLUMNAR_VELOX_SSD_CACHE_PATH = buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath") .internal() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
