This is an automated email from the ASF dual-hosted git repository.

lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0745303e49 [GLUTEN-8705][CH] Enable MemorySpillScheduler (#8706)
0745303e49 is described below

commit 0745303e49147dcbf07d343c58a53ccea38b4db6
Author: lgbo <[email protected]>
AuthorDate: Mon Feb 17 15:32:26 2025 +0800

    [GLUTEN-8705][CH] Enable MemorySpillScheduler (#8706)
    
    * enable spill manager
    
    * update
---
 .../backendsapi/clickhouse/CHListenerApi.scala     |  7 ++++
 .../backendsapi/clickhouse/RuntimeSettings.scala   |  6 +++
 .../Operator/GraceAggregatingTransform.cpp         | 49 ++++++++++++++++++++++
 .../Operator/GraceAggregatingTransform.h           |  7 ++++
 4 files changed, 69 insertions(+)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 5334c3a9b6..77d571aa19 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -93,6 +93,13 @@ class CHListenerApi extends ListenerApi with Logging {
       "timezone" -> conf.get("spark.sql.session.timeZone", 
TimeZone.getDefault.getID),
       "local_engine.settings.log_processors_profiles" -> "true")
     conf.setCHSettings("spark_version", SPARK_VERSION)
+    if (!conf.contains(RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.key)) {
+      // Enable adaptive memory spill scheduler for native by default
+      conf.set(
+        RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.key,
+        RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.defaultValueString)
+    }
+
     // add memory limit for external sort
     if (conf.getLong(RuntimeSettings.MAX_BYTES_BEFORE_EXTERNAL_SORT.key, -1) < 
0) {
       if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue = 
false)) {
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
index 2daeffe979..bcd245eafe 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
@@ -96,4 +96,10 @@ object RuntimeSettings {
       .doc("The bucket directory for writing data")
       .stringConf
       .createWithDefault("")
+
+  val ENABLE_MEMORY_SPILL_SCHEDULER =
+    buildConf(runtimeSettings("enable_adaptive_memory_spill_scheduler"))
+      .doc("Enable memory spill scheduler")
+      .booleanConf
+      .createWithDefault(true)
 }
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp 
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
index ff15a7e73d..01c712cb10 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
@@ -61,6 +61,9 @@ GraceAggregatingTransform::GraceAggregatingTransform(
     if (enable_spill_test)
         buckets.emplace(1, BufferFileStream());
     current_data_variants = std::make_shared<DB::AggregatedDataVariants>();
+
+    // IProcessor::spillable, MemorySpillScheduler will trigger the spill by 
enable this flag.
+    spillable = true;
 }
 
 GraceAggregatingTransform::~GraceAggregatingTransform()
@@ -477,6 +480,8 @@ void GraceAggregatingTransform::mergeOneBlock(const 
DB::Block & block, bool is_o
     {
         rehashDataVariants();
     }
+    // reset the flag
+    force_spill = false;
 
     LOG_DEBUG(
         logger,
@@ -535,6 +540,13 @@ void GraceAggregatingTransform::mergeOneBlock(const 
DB::Block & block, bool is_o
 
 bool GraceAggregatingTransform::isMemoryOverflow()
 {
+    if (force_spill)
+    {
+        auto stats = getMemoryStats();
+        if (stats.spillable_memory_bytes > force_spill_on_bytes * 0.8)
+            return true;
+    }
+
     /// More greedy memory usage strategy.
     if (!current_data_variants)
         return false;
@@ -581,4 +593,41 @@ bool GraceAggregatingTransform::isMemoryOverflow()
     return false;
 }
 
+DB::ProcessorMemoryStats GraceAggregatingTransform::getMemoryStats()
+{
+    DB::ProcessorMemoryStats stats;
+    if (!current_data_variants)
+        return stats;
+
+    stats.need_reserved_memory_bytes = 
current_data_variants->aggregates_pool->allocatedBytes();
+    for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i)
+    {
+        auto & file_stream = buckets[i];
+        stats.spillable_memory_bytes += file_stream.pending_bytes;
+    }
+
+    if (per_key_memory_usage > 0)
+    {
+        auto current_result_rows = current_data_variants->size();
+        stats.need_reserved_memory_bytes += current_result_rows * 
per_key_memory_usage;
+        stats.spillable_memory_bytes += current_result_rows * 
per_key_memory_usage;
+    }
+    else
+    {
+        // This is a rough estimation, we don't know the exact memory usage 
for each key.
+        stats.spillable_memory_bytes += 
current_data_variants->aggregates_pool->allocatedBytes();
+    }
+    return stats;
+}
+
+bool GraceAggregatingTransform::spillOnSize(size_t bytes)
+{
+    auto stats = getMemoryStats();
+    if (stats.spillable_memory_bytes < bytes * 0.8)
+        return false;
+    force_spill = true;
+    force_spill_on_bytes = bytes;
+    return true;
+}
+
 }
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h 
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
index c34d7714cf..fa02a6f77a 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
@@ -23,6 +23,7 @@
 #include <Processors/IProcessor.h>
 #include <Processors/Transforms/AggregatingTransform.h>
 #include <Poco/Logger.h>
+#include <Common/MemorySpillScheduler.h>
 #include <Common/AggregateUtil.h>
 
 
@@ -106,7 +107,13 @@ private:
     void checkAndSetupCurrentDataVariants();
     /// Merge one block into current_data_variants.
     void mergeOneBlock(const DB::Block & block, bool is_original_block);
+
+    // spill control
     bool isMemoryOverflow();
+    DB::ProcessorMemoryStats getMemoryStats() override;
+    bool spillOnSize(size_t bytes) override;
+    bool force_spill = false; // a force flag to trigger spill
+    bool force_spill_on_bytes = 0;
 
     bool input_finished = false;
     bool has_input = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to