This is an automated email from the ASF dual-hosted git repository. hongze pushed a commit to branch revert-7799-add-maxwaittime-for-memory-arbitration in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit 0d1d40d71f74aaee95b4e5969dcfac6f11275b71 Author: Hongze Zhang <[email protected]> AuthorDate: Thu Nov 7 08:44:29 2024 +0800 Revert "[GLUTEN-7800][VL] Add config for max reclaim wait time to avoid dead …" This reverts commit 761f1c58a3adb6d768680523431ccb67b459f6c5. --- cpp/velox/config/VeloxConfig.h | 3 -- cpp/velox/jni/VeloxJniWrapper.cc | 1 + cpp/velox/memory/VeloxMemoryManager.cc | 42 ++++++++++------------ cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 1 + .../scala/org/apache/gluten/GlutenConfig.scala | 7 ---- 5 files changed, 21 insertions(+), 33 deletions(-) diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index cb70dc6278..792beda96f 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -76,9 +76,6 @@ const bool kMemoryUseHugePagesDefault = false; const std::string kVeloxMemInitCapacity = "spark.gluten.sql.columnar.backend.velox.memInitCapacity"; const uint64_t kVeloxMemInitCapacityDefault = 8 << 20; -const std::string kVeloxMemReclaimMaxWaitMs = "spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs"; -const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min - const std::string kHiveConnectorId = "test-hive"; const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled"; diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 6ea60d651a..b8d2b0c3c2 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -385,6 +385,7 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapp jlong dsHandle, jlong batchHandle) { JNI_METHOD_START + auto ctx = gluten::getRuntime(env, wrapper); auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle); auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle); datasource->write(batch); diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index 55420d9380..19a5d45804 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -22,7 +22,6 @@ #include "velox/common/memory/MallocAllocator.h" #include "velox/common/memory/MemoryPool.h" -#include "velox/common/memory/SharedArbitrator.h" #include "velox/exec/MemoryReclaimer.h" #include "compute/VeloxBackend.h" @@ -37,6 +36,12 @@ namespace gluten { using namespace facebook; namespace { + +static constexpr std::string_view kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"}; +static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20}; +static constexpr std::string_view kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"}; +static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20}; + template <typename T> T getConfig( const std::unordered_map<std::string, std::string>& configs, @@ -52,28 +57,24 @@ T getConfig( return defaultValue; } } // namespace - /// We assume in a single Spark task. No thread-safety should be guaranteed. class ListenableArbitrator : public velox::memory::MemoryArbitrator { public: ListenableArbitrator(const Config& config, AllocationListener* listener) : MemoryArbitrator(config), listener_(listener), - reclaimMaxWaitMs_( - velox::memory::SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(config.extraConfigs)), - memoryPoolInitialCapacity_( - velox::memory::SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(config.extraConfigs)), + memoryPoolInitialCapacity_(velox::config::toCapacity( + getConfig<std::string>( + config.extraConfigs, + kMemoryPoolInitialCapacity, + std::to_string(kDefaultMemoryPoolInitialCapacity)), + velox::config::CapacityUnit::BYTE)), memoryPoolTransferCapacity_(velox::config::toCapacity( getConfig<std::string>( config.extraConfigs, - kMemoryReservationBlockSize, - std::to_string(kMemoryReservationBlockSizeDefault)), - velox::config::CapacityUnit::BYTE)) { - if (reclaimMaxWaitMs_ == 0) { - LOG(WARNING) << kVeloxMemReclaimMaxWaitMs - << " was set to 0, it may cause dead lock when memory arbitration has bug."; - } - } + kMemoryPoolTransferCapacity, + std::to_string(kDefaultMemoryPoolTransferCapacity)), + velox::config::CapacityUnit::BYTE)) {} std::string kind() const override { return kind_; } @@ -120,7 +121,7 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool"); pool = candidates_.begin()->first; } - pool->reclaim(targetBytes, reclaimMaxWaitMs_, status); // ignore the output + pool->reclaim(targetBytes, 0, status); // ignore the output return shrinkCapacity0(pool, 0); } @@ -167,7 +168,6 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { } gluten::AllocationListener* listener_; - const uint64_t reclaimMaxWaitMs_; const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused. const uint64_t memoryPoolTransferCapacity_; @@ -208,18 +208,14 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string& kind, std::unique_ptr< kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault); auto memInitCapacity = VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, kVeloxMemInitCapacityDefault); - auto memReclaimMaxWaitMs = - VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs, kVeloxMemReclaimMaxWaitMsDefault); blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), reservationBlockSize); listenableAlloc_ = std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), blockListener_.get()); arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get()); std::unordered_map<std::string, std::string> extraArbitratorConfigs; - extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = - folly::to<std::string>(memInitCapacity) + "B"; - extraArbitratorConfigs[kMemoryReservationBlockSize] = folly::to<std::string>(reservationBlockSize) + "B"; - extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = - folly::to<std::string>(memReclaimMaxWaitMs) + "ms"; + extraArbitratorConfigs["memory-pool-initial-capacity"] = folly::to<std::string>(memInitCapacity) + "B"; + extraArbitratorConfigs["memory-pool-transfer-capacity"] = folly::to<std::string>(reservationBlockSize) + "B"; + extraArbitratorConfigs["memory-reclaim-max-wait-time"] = folly::to<std::string>(0) + "ms"; ArbitratorFactoryRegister afr(listener_.get()); velox::memory::MemoryManagerOptions mmOptions{ diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc index e17ad5e2f7..34796e378e 100644 --- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc @@ -117,6 +117,7 @@ arrow::Status VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb } arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) { + int64_t rawSize = batch_->size(); bufferOutputStream_->seekp(0); batch_->flush(bufferOutputStream_.get()); auto buffer = bufferOutputStream_->getBuffer(); diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 13bf72d47e..a28a7d26b3 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -1414,13 +1414,6 @@ object GlutenConfig { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("8MB") - val COLUMNAR_VELOX_MEM_RECLAIM_MAX_WAIT_MS = - buildConf("spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs") - .internal() - .doc("The max time in ms to wait for memory reclaim.") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(TimeUnit.MINUTES.toMillis(60)) - val COLUMNAR_VELOX_SSD_CACHE_PATH = buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath") .internal() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
