This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a94f51e01f Revert "[GLUTEN-7800][VL] Add config for max reclaim wait
time to avoid dead …" (#7836)
a94f51e01f is described below
commit a94f51e01f90b8e26d954c271caabca51ff921a5
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Nov 7 09:53:02 2024 +0800
Revert "[GLUTEN-7800][VL] Add config for max reclaim wait time to avoid
dead …" (#7836)
This reverts commit 761f1c58a3adb6d768680523431ccb67b459f6c5.
---
cpp/velox/config/VeloxConfig.h | 3 --
cpp/velox/jni/VeloxJniWrapper.cc | 1 +
cpp/velox/memory/VeloxMemoryManager.cc | 42 ++++++++++------------
cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 1 +
.../scala/org/apache/gluten/GlutenConfig.scala | 7 ----
5 files changed, 21 insertions(+), 33 deletions(-)
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index cb70dc6278..792beda96f 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -76,9 +76,6 @@ const bool kMemoryUseHugePagesDefault = false;
const std::string kVeloxMemInitCapacity =
"spark.gluten.sql.columnar.backend.velox.memInitCapacity";
const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;
-const std::string kVeloxMemReclaimMaxWaitMs =
"spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs";
-const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min
-
const std::string kHiveConnectorId = "test-hive";
const std::string kVeloxCacheEnabled =
"spark.gluten.sql.columnar.backend.velox.cacheEnabled";
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 6ea60d651a..b8d2b0c3c2 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -385,6 +385,7 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapp
jlong dsHandle,
jlong batchHandle) {
JNI_METHOD_START
+ auto ctx = gluten::getRuntime(env, wrapper);
auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
datasource->write(batch);
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 55420d9380..19a5d45804 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -22,7 +22,6 @@
#include "velox/common/memory/MallocAllocator.h"
#include "velox/common/memory/MemoryPool.h"
-#include "velox/common/memory/SharedArbitrator.h"
#include "velox/exec/MemoryReclaimer.h"
#include "compute/VeloxBackend.h"
@@ -37,6 +36,12 @@ namespace gluten {
using namespace facebook;
namespace {
+
+static constexpr std::string_view
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
+static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
+static constexpr std::string_view
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
+static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
+
template <typename T>
T getConfig(
const std::unordered_map<std::string, std::string>& configs,
@@ -52,28 +57,24 @@ T getConfig(
return defaultValue;
}
} // namespace
-
/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
public:
ListenableArbitrator(const Config& config, AllocationListener* listener)
: MemoryArbitrator(config),
listener_(listener),
- reclaimMaxWaitMs_(
-
velox::memory::SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(config.extraConfigs)),
- memoryPoolInitialCapacity_(
-
velox::memory::SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(config.extraConfigs)),
+ memoryPoolInitialCapacity_(velox::config::toCapacity(
+ getConfig<std::string>(
+ config.extraConfigs,
+ kMemoryPoolInitialCapacity,
+ std::to_string(kDefaultMemoryPoolInitialCapacity)),
+ velox::config::CapacityUnit::BYTE)),
memoryPoolTransferCapacity_(velox::config::toCapacity(
getConfig<std::string>(
config.extraConfigs,
- kMemoryReservationBlockSize,
- std::to_string(kMemoryReservationBlockSizeDefault)),
- velox::config::CapacityUnit::BYTE)) {
- if (reclaimMaxWaitMs_ == 0) {
- LOG(WARNING) << kVeloxMemReclaimMaxWaitMs
- << " was set to 0, it may cause dead lock when memory
arbitration has bug.";
- }
- }
+ kMemoryPoolTransferCapacity,
+ std::to_string(kDefaultMemoryPoolTransferCapacity)),
+ velox::config::CapacityUnit::BYTE)) {}
std::string kind() const override {
return kind_;
}
@@ -120,7 +121,7 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only
be used within a single root pool");
pool = candidates_.begin()->first;
}
- pool->reclaim(targetBytes, reclaimMaxWaitMs_, status); // ignore the output
+ pool->reclaim(targetBytes, 0, status); // ignore the output
return shrinkCapacity0(pool, 0);
}
@@ -167,7 +168,6 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
}
gluten::AllocationListener* listener_;
- const uint64_t reclaimMaxWaitMs_;
const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
const uint64_t memoryPoolTransferCapacity_;
@@ -208,18 +208,14 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string&
kind, std::unique_ptr<
kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
auto memInitCapacity =
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity,
kVeloxMemInitCapacityDefault);
- auto memReclaimMaxWaitMs =
-
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs,
kVeloxMemReclaimMaxWaitMsDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(),
reservationBlockSize);
listenableAlloc_ =
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
blockListener_.get());
arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
std::unordered_map<std::string, std::string> extraArbitratorConfigs;
-
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)]
=
- folly::to<std::string>(memInitCapacity) + "B";
- extraArbitratorConfigs[kMemoryReservationBlockSize] =
folly::to<std::string>(reservationBlockSize) + "B";
-
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)]
=
- folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
+ extraArbitratorConfigs["memory-pool-initial-capacity"] =
folly::to<std::string>(memInitCapacity) + "B";
+ extraArbitratorConfigs["memory-pool-transfer-capacity"] =
folly::to<std::string>(reservationBlockSize) + "B";
+ extraArbitratorConfigs["memory-reclaim-max-wait-time"] =
folly::to<std::string>(0) + "ms";
ArbitratorFactoryRegister afr(listener_.get());
velox::memory::MemoryManagerOptions mmOptions{
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index e17ad5e2f7..34796e378e 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -117,6 +117,7 @@ arrow::Status
VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb
}
arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
+ int64_t rawSize = batch_->size();
bufferOutputStream_->seekp(0);
batch_->flush(bufferOutputStream_.get());
auto buffer = bufferOutputStream_->getBuffer();
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 13bf72d47e..a28a7d26b3 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1414,13 +1414,6 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8MB")
- val COLUMNAR_VELOX_MEM_RECLAIM_MAX_WAIT_MS =
- buildConf("spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs")
- .internal()
- .doc("The max time in ms to wait for memory reclaim.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefault(TimeUnit.MINUTES.toMillis(60))
-
val COLUMNAR_VELOX_SSD_CACHE_PATH =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]