This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 63c166d62 [VL] Support simulate task spilling in GenericBenchmark
(#5795)
63c166d62 is described below
commit 63c166d62b69956603a4a0bd8991dc4fdf8b01b1
Author: Yang Zhang <[email protected]>
AuthorDate: Fri May 17 22:27:51 2024 +0800
[VL] Support simulate task spilling in GenericBenchmark (#5795)
---
cpp/velox/benchmarks/GenericBenchmark.cc | 10 +++++++++-
cpp/velox/benchmarks/common/BenchmarkUtils.cc | 15 +++++++++++++++
cpp/velox/benchmarks/common/BenchmarkUtils.h | 16 ++++++++++++++++
cpp/velox/memory/VeloxMemoryManager.h | 4 ++++
docs/developers/MicroBenchmarks.md | 4 ++++
5 files changed, 48 insertions(+), 1 deletion(-)
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 71d3d96b5..b7a50800e 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -64,6 +64,7 @@ DEFINE_string(
DEFINE_string(data, "", "Path to input data files in parquet format, used for
shuffle read.");
DEFINE_string(conf, "", "Path to the configuration file.");
DEFINE_string(write_path, "/tmp", "Path to save the output from write tasks.");
+DEFINE_int64(memory_limit, std::numeric_limits<int64_t>::max(), "Memory limit
used to trigger spill.");
struct WriterMetrics {
int64_t splitTime;
@@ -149,7 +150,11 @@ auto BM_Generic = [](::benchmark::State& state,
setCpu(state.thread_index());
}
memory::MemoryManager::testingSetInstance({});
- auto memoryManager = getDefaultMemoryManager();
+
+ auto memoryManager = std::make_unique<gluten::VeloxMemoryManager>(
+ "generic_benchmark",
+ gluten::defaultMemoryAllocator(),
+ std::make_unique<BenchmarkAllocationListener>(FLAGS_memory_limit));
auto runtime = Runtime::create(kVeloxRuntimeKind, conf);
auto plan = getPlanFromFile("Plan", planFile);
std::vector<std::string> splits{};
@@ -182,6 +187,9 @@ auto BM_Generic = [](::benchmark::State& state,
}
auto resultIter =
runtime->createResultIterator(memoryManager.get(), "/tmp/test-spill",
std::move(inputIters), conf);
+ if (auto listener =
dynamic_cast<BenchmarkAllocationListener*>(memoryManager->getListener())) {
+ listener->setIterator(resultIter.get());
+ }
auto veloxPlan =
dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan();
if (FLAGS_with_shuffle) {
int64_t shuffleWriteTime;
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index efe1fc60a..ccec6f3c4 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -180,3 +180,18 @@ void cleanupShuffleOutput(const std::string& dataFile,
const std::vector<std::st
}
}
}
+
+void BenchmarkAllocationListener::allocationChanged(int64_t diff) {
+ if (usedBytes_ + diff >= limit_) {
+ LOG(INFO) << fmt::format(
+ "reach hard limit {} when need {}, current used {}.",
+ velox::succinctBytes(limit_),
+ velox::succinctBytes(diff),
+ velox::succinctBytes(usedBytes_));
+ auto neededBytes = usedBytes_ + diff - limit_;
+ auto spilledBytes = iterator_->spillFixedSize(neededBytes);
+ LOG(INFO) << fmt::format("spill finish, got {}.",
velox::succinctBytes(spilledBytes));
+ } else {
+ usedBytes_ += diff;
+ }
+}
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h
b/cpp/velox/benchmarks/common/BenchmarkUtils.h
index 79f4a53cb..ff5e675f7 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.h
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h
@@ -102,3 +102,19 @@ arrow::Status
setLocalDirsAndDataFileFromEnv(std::string& dataFile,
std::vector<std::string>& localDirs, bool& isFromEnv);
void cleanupShuffleOutput(const std::string& dataFile, const
std::vector<std::string>& localDirs, bool isFromEnv);
+
+class BenchmarkAllocationListener final : public gluten::AllocationListener {
+ public:
+ BenchmarkAllocationListener(uint64_t limit) : limit_(limit) {}
+
+ void setIterator(gluten::ResultIterator* iterator) {
+ iterator_ = iterator;
+ }
+
+ void allocationChanged(int64_t diff) override;
+
+ private:
+ uint64_t usedBytes_{0L};
+ uint64_t limit_{0L};
+ gluten::ResultIterator* iterator_;
+};
diff --git a/cpp/velox/memory/VeloxMemoryManager.h
b/cpp/velox/memory/VeloxMemoryManager.h
index fda153ada..1e8bcd8c8 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -60,6 +60,10 @@ class VeloxMemoryManager final : public MemoryManager {
void hold() override;
+ AllocationListener* getListener() const {
+ return listener_.get();
+ }
+
private:
bool tryDestructSafe();
diff --git a/docs/developers/MicroBenchmarks.md
b/docs/developers/MicroBenchmarks.md
index dc0c3b2a0..7fc2a535d 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -280,6 +280,10 @@ check [IntelĀ® QuickAssist Technology (QAT)
support](../get-started/Velox.md#int
For IAA support, please
check [IntelĀ® In-memory Analytics Accelerator (IAA/IAX)
support](../get-started/Velox.md#intel-in-memory-analytics-accelerator-iaaiax-support)
+## Simulate task spilling
+
+You can simulate task spilling by specify memory hard limit from
`--memory_limit`.
+
## Simulate Spark with multiple processes and threads
You can use below command to launch several processes and threads to simulate
parallel execution on
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]