This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0745303e49 [GLUTEN-8705][CH] Enable MemorySpillScheduler (#8706)
0745303e49 is described below
commit 0745303e49147dcbf07d343c58a53ccea38b4db6
Author: lgbo <[email protected]>
AuthorDate: Mon Feb 17 15:32:26 2025 +0800
[GLUTEN-8705][CH] Enable MemorySpillScheduler (#8706)
* enable spill manager
* update
---
.../backendsapi/clickhouse/CHListenerApi.scala | 7 ++++
.../backendsapi/clickhouse/RuntimeSettings.scala | 6 +++
.../Operator/GraceAggregatingTransform.cpp | 49 ++++++++++++++++++++++
.../Operator/GraceAggregatingTransform.h | 7 ++++
4 files changed, 69 insertions(+)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 5334c3a9b6..77d571aa19 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -93,6 +93,13 @@ class CHListenerApi extends ListenerApi with Logging {
"timezone" -> conf.get("spark.sql.session.timeZone",
TimeZone.getDefault.getID),
"local_engine.settings.log_processors_profiles" -> "true")
conf.setCHSettings("spark_version", SPARK_VERSION)
+ if (!conf.contains(RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.key)) {
+ // Enable adaptive memory spill scheduler for native by default
+ conf.set(
+ RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.key,
+ RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.defaultValueString)
+ }
+
// add memory limit for external sort
if (conf.getLong(RuntimeSettings.MAX_BYTES_BEFORE_EXTERNAL_SORT.key, -1) <
0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue =
false)) {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
index 2daeffe979..bcd245eafe 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
@@ -96,4 +96,10 @@ object RuntimeSettings {
.doc("The bucket directory for writing data")
.stringConf
.createWithDefault("")
+
+ val ENABLE_MEMORY_SPILL_SCHEDULER =
+ buildConf(runtimeSettings("enable_adaptive_memory_spill_scheduler"))
+ .doc("Enable memory spill scheduler")
+ .booleanConf
+ .createWithDefault(true)
}
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
index ff15a7e73d..01c712cb10 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
@@ -61,6 +61,9 @@ GraceAggregatingTransform::GraceAggregatingTransform(
if (enable_spill_test)
buckets.emplace(1, BufferFileStream());
current_data_variants = std::make_shared<DB::AggregatedDataVariants>();
+
+ // IProcessor::spillable, MemorySpillScheduler will trigger the spill by
enable this flag.
+ spillable = true;
}
GraceAggregatingTransform::~GraceAggregatingTransform()
@@ -477,6 +480,8 @@ void GraceAggregatingTransform::mergeOneBlock(const
DB::Block & block, bool is_o
{
rehashDataVariants();
}
+ // reset the flag
+ force_spill = false;
LOG_DEBUG(
logger,
@@ -535,6 +540,13 @@ void GraceAggregatingTransform::mergeOneBlock(const
DB::Block & block, bool is_o
bool GraceAggregatingTransform::isMemoryOverflow()
{
+ if (force_spill)
+ {
+ auto stats = getMemoryStats();
+ if (stats.spillable_memory_bytes > force_spill_on_bytes * 0.8)
+ return true;
+ }
+
/// More greedy memory usage strategy.
if (!current_data_variants)
return false;
@@ -581,4 +593,41 @@ bool GraceAggregatingTransform::isMemoryOverflow()
return false;
}
+DB::ProcessorMemoryStats GraceAggregatingTransform::getMemoryStats()
+{
+ DB::ProcessorMemoryStats stats;
+ if (!current_data_variants)
+ return stats;
+
+ stats.need_reserved_memory_bytes =
current_data_variants->aggregates_pool->allocatedBytes();
+ for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i)
+ {
+ auto & file_stream = buckets[i];
+ stats.spillable_memory_bytes += file_stream.pending_bytes;
+ }
+
+ if (per_key_memory_usage > 0)
+ {
+ auto current_result_rows = current_data_variants->size();
+ stats.need_reserved_memory_bytes += current_result_rows *
per_key_memory_usage;
+ stats.spillable_memory_bytes += current_result_rows *
per_key_memory_usage;
+ }
+ else
+ {
+ // This is a rough estimation, we don't know the exact memory usage
for each key.
+ stats.spillable_memory_bytes +=
current_data_variants->aggregates_pool->allocatedBytes();
+ }
+ return stats;
+}
+
+bool GraceAggregatingTransform::spillOnSize(size_t bytes)
+{
+ auto stats = getMemoryStats();
+ if (stats.spillable_memory_bytes < bytes * 0.8)
+ return false;
+ force_spill = true;
+ force_spill_on_bytes = bytes;
+ return true;
+}
+
}
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
index c34d7714cf..fa02a6f77a 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
@@ -23,6 +23,7 @@
#include <Processors/IProcessor.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Poco/Logger.h>
+#include <Common/MemorySpillScheduler.h>
#include <Common/AggregateUtil.h>
@@ -106,7 +107,13 @@ private:
void checkAndSetupCurrentDataVariants();
/// Merge one block into current_data_variants.
void mergeOneBlock(const DB::Block & block, bool is_original_block);
+
+ // spill control
bool isMemoryOverflow();
+ DB::ProcessorMemoryStats getMemoryStats() override;
+ bool spillOnSize(size_t bytes) override;
+ bool force_spill = false; // a force flag to trigger spill
+ bool force_spill_on_bytes = 0;
bool input_finished = false;
bool has_input = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]