This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 63c166d62 [VL] Support simulate task spilling in GenericBenchmark 
(#5795)
63c166d62 is described below

commit 63c166d62b69956603a4a0bd8991dc4fdf8b01b1
Author: Yang Zhang <[email protected]>
AuthorDate: Fri May 17 22:27:51 2024 +0800

    [VL] Support simulate task spilling in GenericBenchmark (#5795)
---
 cpp/velox/benchmarks/GenericBenchmark.cc      | 10 +++++++++-
 cpp/velox/benchmarks/common/BenchmarkUtils.cc | 15 +++++++++++++++
 cpp/velox/benchmarks/common/BenchmarkUtils.h  | 16 ++++++++++++++++
 cpp/velox/memory/VeloxMemoryManager.h         |  4 ++++
 docs/developers/MicroBenchmarks.md            |  4 ++++
 5 files changed, 48 insertions(+), 1 deletion(-)

diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 71d3d96b5..b7a50800e 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -64,6 +64,7 @@ DEFINE_string(
 DEFINE_string(data, "", "Path to input data files in parquet format, used for 
shuffle read.");
 DEFINE_string(conf, "", "Path to the configuration file.");
 DEFINE_string(write_path, "/tmp", "Path to save the output from write tasks.");
+DEFINE_int64(memory_limit, std::numeric_limits<int64_t>::max(), "Memory limit 
used to trigger spill.");
 
 struct WriterMetrics {
   int64_t splitTime;
@@ -149,7 +150,11 @@ auto BM_Generic = [](::benchmark::State& state,
     setCpu(state.thread_index());
   }
   memory::MemoryManager::testingSetInstance({});
-  auto memoryManager = getDefaultMemoryManager();
+
+  auto memoryManager = std::make_unique<gluten::VeloxMemoryManager>(
+      "generic_benchmark",
+      gluten::defaultMemoryAllocator(),
+      std::make_unique<BenchmarkAllocationListener>(FLAGS_memory_limit));
   auto runtime = Runtime::create(kVeloxRuntimeKind, conf);
   auto plan = getPlanFromFile("Plan", planFile);
   std::vector<std::string> splits{};
@@ -182,6 +187,9 @@ auto BM_Generic = [](::benchmark::State& state,
     }
     auto resultIter =
         runtime->createResultIterator(memoryManager.get(), "/tmp/test-spill", 
std::move(inputIters), conf);
+    if (auto listener = 
dynamic_cast<BenchmarkAllocationListener*>(memoryManager->getListener())) {
+      listener->setIterator(resultIter.get());
+    }
     auto veloxPlan = 
dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan();
     if (FLAGS_with_shuffle) {
       int64_t shuffleWriteTime;
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc 
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index efe1fc60a..ccec6f3c4 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -180,3 +180,18 @@ void cleanupShuffleOutput(const std::string& dataFile, 
const std::vector<std::st
     }
   }
 }
+
+void BenchmarkAllocationListener::allocationChanged(int64_t diff) {
+  if (usedBytes_ + diff >= limit_) {
+    LOG(INFO) << fmt::format(
+        "reach hard limit {} when need {}, current used {}.",
+        velox::succinctBytes(limit_),
+        velox::succinctBytes(diff),
+        velox::succinctBytes(usedBytes_));
+    auto neededBytes = usedBytes_ + diff - limit_;
+    auto spilledBytes = iterator_->spillFixedSize(neededBytes);
+    LOG(INFO) << fmt::format("spill finish, got {}.", 
velox::succinctBytes(spilledBytes));
+  } else {
+    usedBytes_ += diff;
+  }
+}
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h 
b/cpp/velox/benchmarks/common/BenchmarkUtils.h
index 79f4a53cb..ff5e675f7 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.h
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h
@@ -102,3 +102,19 @@ arrow::Status
 setLocalDirsAndDataFileFromEnv(std::string& dataFile, 
std::vector<std::string>& localDirs, bool& isFromEnv);
 
 void cleanupShuffleOutput(const std::string& dataFile, const 
std::vector<std::string>& localDirs, bool isFromEnv);
+
+class BenchmarkAllocationListener final : public gluten::AllocationListener {
+ public:
+  BenchmarkAllocationListener(uint64_t limit) : limit_(limit) {}
+
+  void setIterator(gluten::ResultIterator* iterator) {
+    iterator_ = iterator;
+  }
+
+  void allocationChanged(int64_t diff) override;
+
+ private:
+  uint64_t usedBytes_{0L};
+  uint64_t limit_{0L};
+  gluten::ResultIterator* iterator_;
+};
diff --git a/cpp/velox/memory/VeloxMemoryManager.h 
b/cpp/velox/memory/VeloxMemoryManager.h
index fda153ada..1e8bcd8c8 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -60,6 +60,10 @@ class VeloxMemoryManager final : public MemoryManager {
 
   void hold() override;
 
+  AllocationListener* getListener() const {
+    return listener_.get();
+  }
+
  private:
   bool tryDestructSafe();
 
diff --git a/docs/developers/MicroBenchmarks.md 
b/docs/developers/MicroBenchmarks.md
index dc0c3b2a0..7fc2a535d 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -280,6 +280,10 @@ check [IntelĀ® QuickAssist Technology (QAT) 
support](../get-started/Velox.md#int
 For IAA support, please
 check [IntelĀ® In-memory Analytics Accelerator (IAA/IAX) 
support](../get-started/Velox.md#intel-in-memory-analytics-accelerator-iaaiax-support)
 
+## Simulate task spilling
+
+You can simulate task spilling by specify memory hard limit from 
`--memory_limit`.
+
 ## Simulate Spark with multiple processes and threads
 
 You can use below command to launch several processes and threads to simulate 
parallel execution on


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to