This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5055e8e5b4 [GLUTEN-8096][CH] Invalid header for disk tmp file (#8100)
5055e8e5b4 is described below
commit 5055e8e5b4c7a05ae71067337926a4a33413a025
Author: lgbo <[email protected]>
AuthorDate: Fri Nov 29 17:18:11 2024 +0800
[GLUTEN-8096][CH] Invalid header for disk tmp file (#8100)
[CH] Invalid header for disk tmp file
---
.../apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala | 3 +++
.../execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala | 3 +++
.../execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala | 3 +++
cpp-ch/local-engine/Common/GlutenConfig.cpp | 1 +
cpp-ch/local-engine/Common/GlutenConfig.h | 2 ++
cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp | 8 +++++---
cpp-ch/local-engine/Operator/GraceAggregatingTransform.h | 1 +
7 files changed, 18 insertions(+), 3 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
index 6a09bf4942..7bccb6dfb5 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -47,6 +47,9 @@ class GlutenClickHouseTPCHBucketSuite
.set("spark.sql.autoBroadcastJoinThreshold", "-1") // for test bucket
join
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm",
"sparkMurmurHash3_32")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
+ "true")
}
override protected val createNullableTables = true
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
index aa7c6b0f56..0ba7de90c6 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
@@ -38,6 +38,9 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
.set("spark.memory.offHeap.size", "4g")
.set("spark.gluten.sql.validation.logLevel", "ERROR")
.set("spark.gluten.sql.validation.printStackOnFailure", "true")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
+ "true")
}
executeTPCDSTest(false)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
index 59b5834e64..a257e2ed50 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
@@ -53,6 +53,9 @@ class GlutenClickHouseTPCHParquetBucketSuite
.set("spark.sql.autoBroadcastJoinThreshold", "-1") // for test bucket
join
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm",
"sparkMurmurHash3_32")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
+ "true")
}
override protected val createNullableTables = true
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.cpp
b/cpp-ch/local-engine/Common/GlutenConfig.cpp
index ce15a12f92..75bb41f10d 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.cpp
+++ b/cpp-ch/local-engine/Common/GlutenConfig.cpp
@@ -73,6 +73,7 @@ GraceMergingAggregateConfig
GraceMergingAggregateConfig::loadFromContext(const D
=
context->getConfigRef().getUInt64(MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET,
1_MiB);
config.max_allowed_memory_usage_ratio_for_aggregate_merging
=
context->getConfigRef().getDouble(MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING,
0.9);
+ config.enable_spill_test =
context->getConfigRef().getBool(ENABLE_SPILL_TEST, false);
return config;
}
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 85839b70ec..07952383fa 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -62,12 +62,14 @@ struct GraceMergingAggregateConfig
= "max_pending_flush_blocks_per_grace_aggregate_merging_bucket";
inline static const String
MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING
= "max_allowed_memory_usage_ratio_for_aggregate_merging";
+ inline static const String ENABLE_SPILL_TEST =
"enable_grace_aggregate_spill_test";
size_t max_grace_aggregate_merging_buckets = 32;
bool throw_on_overflow_grace_aggregate_merging_buckets = false;
size_t aggregated_keys_before_extend_grace_aggregate_merging_buckets =
8192;
size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB;
double max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9;
+ bool enable_spill_test = false;
static GraceMergingAggregateConfig loadFromContext(const DB::ContextPtr &
context);
};
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
index 63dc3c3457..0dd1ac3584 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
@@ -56,7 +56,9 @@ GraceAggregatingTransform::GraceAggregatingTransform(
max_allowed_memory_usage_ratio =
config.max_allowed_memory_usage_ratio_for_aggregate_merging;
// bucket 0 is for in-memory data, it's just a placeholder.
buckets.emplace(0, BufferFileStream());
-
+ enable_spill_test = config.enable_spill_test;
+ if (enable_spill_test)
+ buckets.emplace(1, BufferFileStream());
current_data_variants = std::make_shared<DB::AggregatedDataVariants>();
}
@@ -289,7 +291,7 @@ void
GraceAggregatingTransform::addBlockIntoFileBucket(size_t bucket_index, cons
file_stream.original_blocks.push_back(block);
else
file_stream.intermediate_blocks.push_back(block);
- if (file_stream.pending_bytes > max_pending_flush_blocks_per_bucket)
+ if (file_stream.pending_bytes > max_pending_flush_blocks_per_bucket ||
(file_stream.pending_bytes && enable_spill_test))
{
flushBucket(bucket_index);
file_stream.pending_bytes = 0;
@@ -349,7 +351,7 @@ size_t GraceAggregatingTransform::flushBucket(size_t
bucket_index)
if (!file_stream.intermediate_file_stream)
{
auto intermediate_header = params->aggregator.getHeader(false);
- file_stream.intermediate_file_stream =
DB::TemporaryBlockStreamHolder(header, tmp_data_disk.get());
+ file_stream.intermediate_file_stream =
DB::TemporaryBlockStreamHolder(intermediate_header, tmp_data_disk.get());
}
flush_bytes +=
flushBlocksInfoDisk(file_stream.intermediate_file_stream,
file_stream.intermediate_blocks);
}
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
index 612a58b3c9..c34d7714cf 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
@@ -116,6 +116,7 @@ private:
DB::BlocksList current_final_blocks;
std::unique_ptr<AggregateDataBlockConverter> block_converter = nullptr;
bool no_more_keys = false;
+ bool enable_spill_test = false;
double per_key_memory_usage = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]