This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a94f51e01f Revert "[GLUTEN-7800][VL] Add config for max reclaim wait 
time to avoid dead …" (#7836)
a94f51e01f is described below

commit a94f51e01f90b8e26d954c271caabca51ff921a5
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Nov 7 09:53:02 2024 +0800

    Revert "[GLUTEN-7800][VL] Add config for max reclaim wait time to avoid 
dead …" (#7836)
    
    This reverts commit 761f1c58a3adb6d768680523431ccb67b459f6c5.
---
 cpp/velox/config/VeloxConfig.h                     |  3 --
 cpp/velox/jni/VeloxJniWrapper.cc                   |  1 +
 cpp/velox/memory/VeloxMemoryManager.cc             | 42 ++++++++++------------
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc     |  1 +
 .../scala/org/apache/gluten/GlutenConfig.scala     |  7 ----
 5 files changed, 21 insertions(+), 33 deletions(-)

diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index cb70dc6278..792beda96f 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -76,9 +76,6 @@ const bool kMemoryUseHugePagesDefault = false;
 const std::string kVeloxMemInitCapacity = 
"spark.gluten.sql.columnar.backend.velox.memInitCapacity";
 const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;
 
-const std::string kVeloxMemReclaimMaxWaitMs = 
"spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs";
-const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min
-
 const std::string kHiveConnectorId = "test-hive";
 const std::string kVeloxCacheEnabled = 
"spark.gluten.sql.columnar.backend.velox.cacheEnabled";
 
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 6ea60d651a..b8d2b0c3c2 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -385,6 +385,7 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapp
     jlong dsHandle,
     jlong batchHandle) {
   JNI_METHOD_START
+  auto ctx = gluten::getRuntime(env, wrapper);
   auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
   auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
   datasource->write(batch);
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 55420d9380..19a5d45804 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -22,7 +22,6 @@
 
 #include "velox/common/memory/MallocAllocator.h"
 #include "velox/common/memory/MemoryPool.h"
-#include "velox/common/memory/SharedArbitrator.h"
 #include "velox/exec/MemoryReclaimer.h"
 
 #include "compute/VeloxBackend.h"
@@ -37,6 +36,12 @@ namespace gluten {
 using namespace facebook;
 
 namespace {
+
+static constexpr std::string_view 
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
+static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
+static constexpr std::string_view 
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
+static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
+
 template <typename T>
 T getConfig(
     const std::unordered_map<std::string, std::string>& configs,
@@ -52,28 +57,24 @@ T getConfig(
   return defaultValue;
 }
 } // namespace
-
 /// We assume in a single Spark task. No thread-safety should be guaranteed.
 class ListenableArbitrator : public velox::memory::MemoryArbitrator {
  public:
   ListenableArbitrator(const Config& config, AllocationListener* listener)
       : MemoryArbitrator(config),
         listener_(listener),
-        reclaimMaxWaitMs_(
-            
velox::memory::SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(config.extraConfigs)),
-        memoryPoolInitialCapacity_(
-            
velox::memory::SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(config.extraConfigs)),
+        memoryPoolInitialCapacity_(velox::config::toCapacity(
+            getConfig<std::string>(
+                config.extraConfigs,
+                kMemoryPoolInitialCapacity,
+                std::to_string(kDefaultMemoryPoolInitialCapacity)),
+            velox::config::CapacityUnit::BYTE)),
         memoryPoolTransferCapacity_(velox::config::toCapacity(
             getConfig<std::string>(
                 config.extraConfigs,
-                kMemoryReservationBlockSize,
-                std::to_string(kMemoryReservationBlockSizeDefault)),
-            velox::config::CapacityUnit::BYTE)) {
-    if (reclaimMaxWaitMs_ == 0) {
-      LOG(WARNING) << kVeloxMemReclaimMaxWaitMs
-                   << " was set to 0, it may cause dead lock when memory 
arbitration has bug.";
-    }
-  }
+                kMemoryPoolTransferCapacity,
+                std::to_string(kDefaultMemoryPoolTransferCapacity)),
+            velox::config::CapacityUnit::BYTE)) {}
   std::string kind() const override {
     return kind_;
   }
@@ -120,7 +121,7 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
       VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only 
be used within a single root pool");
       pool = candidates_.begin()->first;
     }
-    pool->reclaim(targetBytes, reclaimMaxWaitMs_, status); // ignore the output
+    pool->reclaim(targetBytes, 0, status); // ignore the output
     return shrinkCapacity0(pool, 0);
   }
 
@@ -167,7 +168,6 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
   }
 
   gluten::AllocationListener* listener_;
-  const uint64_t reclaimMaxWaitMs_;
   const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
   const uint64_t memoryPoolTransferCapacity_;
 
@@ -208,18 +208,14 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string& 
kind, std::unique_ptr<
       kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
   auto memInitCapacity =
       
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, 
kVeloxMemInitCapacityDefault);
-  auto memReclaimMaxWaitMs =
-      
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs, 
kVeloxMemReclaimMaxWaitMsDefault);
   blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), 
reservationBlockSize);
   listenableAlloc_ = 
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), 
blockListener_.get());
   arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
 
   std::unordered_map<std::string, std::string> extraArbitratorConfigs;
-  
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)]
 =
-      folly::to<std::string>(memInitCapacity) + "B";
-  extraArbitratorConfigs[kMemoryReservationBlockSize] = 
folly::to<std::string>(reservationBlockSize) + "B";
-  
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)]
 =
-      folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
+  extraArbitratorConfigs["memory-pool-initial-capacity"] = 
folly::to<std::string>(memInitCapacity) + "B";
+  extraArbitratorConfigs["memory-pool-transfer-capacity"] = 
folly::to<std::string>(reservationBlockSize) + "B";
+  extraArbitratorConfigs["memory-reclaim-max-wait-time"] = 
folly::to<std::string>(0) + "ms";
 
   ArbitratorFactoryRegister afr(listener_.get());
   velox::memory::MemoryManagerOptions mmOptions{
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index e17ad5e2f7..34796e378e 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -117,6 +117,7 @@ arrow::Status 
VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb
 }
 
 arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
+  int64_t rawSize = batch_->size();
   bufferOutputStream_->seekp(0);
   batch_->flush(bufferOutputStream_.get());
   auto buffer = bufferOutputStream_->getBuffer();
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 13bf72d47e..a28a7d26b3 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1414,13 +1414,6 @@ object GlutenConfig {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("8MB")
 
-  val COLUMNAR_VELOX_MEM_RECLAIM_MAX_WAIT_MS =
-    buildConf("spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs")
-      .internal()
-      .doc("The max time in ms to wait for memory reclaim.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefault(TimeUnit.MINUTES.toMillis(60))
-
   val COLUMNAR_VELOX_SSD_CACHE_PATH =
     buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to