This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5055e8e5b4 [GLUTEN-8096][CH] Invalid header for disk tmp file (#8100)
5055e8e5b4 is described below

commit 5055e8e5b4c7a05ae71067337926a4a33413a025
Author: lgbo <[email protected]>
AuthorDate: Fri Nov 29 17:18:11 2024 +0800

    [GLUTEN-8096][CH] Invalid header for disk tmp file (#8100)
    
    [CH] Invalid header for disk tmp file
---
 .../apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala | 3 +++
 .../execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala       | 3 +++
 .../execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala   | 3 +++
 cpp-ch/local-engine/Common/GlutenConfig.cpp                       | 1 +
 cpp-ch/local-engine/Common/GlutenConfig.h                         | 2 ++
 cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp        | 8 +++++---
 cpp-ch/local-engine/Operator/GraceAggregatingTransform.h          | 1 +
 7 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
index 6a09bf4942..7bccb6dfb5 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -47,6 +47,9 @@ class GlutenClickHouseTPCHBucketSuite
       .set("spark.sql.autoBroadcastJoinThreshold", "-1") // for test bucket 
join
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", 
"sparkMurmurHash3_32")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
+        "true")
   }
 
   override protected val createNullableTables = true
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
index aa7c6b0f56..0ba7de90c6 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
@@ -38,6 +38,9 @@ class GlutenClickHouseTPCDSParquetSuite extends 
GlutenClickHouseTPCDSAbstractSui
       .set("spark.memory.offHeap.size", "4g")
       .set("spark.gluten.sql.validation.logLevel", "ERROR")
       .set("spark.gluten.sql.validation.printStackOnFailure", "true")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
+        "true")
   }
 
   executeTPCDSTest(false)
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
index 59b5834e64..a257e2ed50 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
@@ -53,6 +53,9 @@ class GlutenClickHouseTPCHParquetBucketSuite
       .set("spark.sql.autoBroadcastJoinThreshold", "-1") // for test bucket 
join
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", 
"sparkMurmurHash3_32")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
+        "true")
   }
 
   override protected val createNullableTables = true
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.cpp 
b/cpp-ch/local-engine/Common/GlutenConfig.cpp
index ce15a12f92..75bb41f10d 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.cpp
+++ b/cpp-ch/local-engine/Common/GlutenConfig.cpp
@@ -73,6 +73,7 @@ GraceMergingAggregateConfig 
GraceMergingAggregateConfig::loadFromContext(const D
         = 
context->getConfigRef().getUInt64(MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET,
 1_MiB);
     config.max_allowed_memory_usage_ratio_for_aggregate_merging
         = 
context->getConfigRef().getDouble(MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING,
 0.9);
+    config.enable_spill_test = 
context->getConfigRef().getBool(ENABLE_SPILL_TEST, false);
     return config;
 }
 
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h 
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 85839b70ec..07952383fa 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -62,12 +62,14 @@ struct GraceMergingAggregateConfig
         = "max_pending_flush_blocks_per_grace_aggregate_merging_bucket";
     inline static const String 
MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING
         = "max_allowed_memory_usage_ratio_for_aggregate_merging";
+    inline static const String ENABLE_SPILL_TEST = 
"enable_grace_aggregate_spill_test";
 
     size_t max_grace_aggregate_merging_buckets = 32;
     bool throw_on_overflow_grace_aggregate_merging_buckets = false;
     size_t aggregated_keys_before_extend_grace_aggregate_merging_buckets = 
8192;
     size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB;
     double max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9;
+    bool enable_spill_test = false;
 
     static GraceMergingAggregateConfig loadFromContext(const DB::ContextPtr & 
context);
 };
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp 
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
index 63dc3c3457..0dd1ac3584 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
@@ -56,7 +56,9 @@ GraceAggregatingTransform::GraceAggregatingTransform(
     max_allowed_memory_usage_ratio = 
config.max_allowed_memory_usage_ratio_for_aggregate_merging;
     // bucket 0 is for in-memory data, it's just a placeholder.
     buckets.emplace(0, BufferFileStream());
-
+    enable_spill_test = config.enable_spill_test;
+    if (enable_spill_test)
+        buckets.emplace(1, BufferFileStream());
     current_data_variants = std::make_shared<DB::AggregatedDataVariants>();
 }
 
@@ -289,7 +291,7 @@ void 
GraceAggregatingTransform::addBlockIntoFileBucket(size_t bucket_index, cons
         file_stream.original_blocks.push_back(block);
     else
         file_stream.intermediate_blocks.push_back(block);
-    if (file_stream.pending_bytes > max_pending_flush_blocks_per_bucket)
+    if (file_stream.pending_bytes > max_pending_flush_blocks_per_bucket || 
(file_stream.pending_bytes && enable_spill_test))
     {
         flushBucket(bucket_index);
         file_stream.pending_bytes = 0;
@@ -349,7 +351,7 @@ size_t GraceAggregatingTransform::flushBucket(size_t 
bucket_index)
         if (!file_stream.intermediate_file_stream)
         {
             auto intermediate_header = params->aggregator.getHeader(false);
-            file_stream.intermediate_file_stream = 
DB::TemporaryBlockStreamHolder(header, tmp_data_disk.get());
+            file_stream.intermediate_file_stream = 
DB::TemporaryBlockStreamHolder(intermediate_header, tmp_data_disk.get());
         }
         flush_bytes += 
flushBlocksInfoDisk(file_stream.intermediate_file_stream, 
file_stream.intermediate_blocks);
     }
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h 
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
index 612a58b3c9..c34d7714cf 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
@@ -116,6 +116,7 @@ private:
     DB::BlocksList current_final_blocks;
     std::unique_ptr<AggregateDataBlockConverter> block_converter = nullptr;
     bool no_more_keys = false;
+    bool enable_spill_test = false;
 
     double per_key_memory_usage = 0;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to