This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c5c064a8ba [GLUTEN-7962][CH] A friendly API to build aggregator params 
#7963
c5c064a8ba is described below

commit c5c064a8bae9454c6f009a1087efece9b4631f36
Author: lgbo <[email protected]>
AuthorDate: Mon Nov 18 17:02:24 2024 +0800

    [GLUTEN-7962][CH] A friendly API to build aggregator params #7963
    
    What changes were proposed in this pull request?
    (Please fill in changes proposed in this fix)
    
    Fixes: #7962
    
    How was this patch tested?
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    
    unit tests
    
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
---
 cpp-ch/local-engine/Common/AggregateUtil.cpp       | 151 ++++++++++++++++++---
 cpp-ch/local-engine/Common/AggregateUtil.h         |  31 ++++-
 .../local-engine/Operator/AdvancedExpandStep.cpp   |  48 +------
 .../Parser/RelParsers/AggregateRelParser.cpp       | 138 +++----------------
 4 files changed, 183 insertions(+), 185 deletions(-)

diff --git a/cpp-ch/local-engine/Common/AggregateUtil.cpp 
b/cpp-ch/local-engine/Common/AggregateUtil.cpp
index 851dd2e7fe..2290747fa1 100644
--- a/cpp-ch/local-engine/Common/AggregateUtil.cpp
+++ b/cpp-ch/local-engine/Common/AggregateUtil.cpp
@@ -15,8 +15,11 @@
  * limitations under the License.
  */
 
+#include "AggregateUtil.h"
+#include <Core/Settings.h>
 #include <Poco/Logger.h>
 #include <Common/AggregateUtil.h>
+#include <Common/CHUtil.h>
 #include <Common/Exception.h>
 #include <Common/Stopwatch.h>
 #include <Common/formatReadable.h>
@@ -26,8 +29,26 @@ namespace DB
 {
 namespace ErrorCodes
 {
-    extern const int LOGICAL_ERROR;
-    extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
+extern const int LOGICAL_ERROR;
+extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
+}
+
+namespace Setting
+{
+extern const SettingsUInt64 max_bytes_before_external_group_by;
+extern const SettingsBool optimize_group_by_constant_keys;
+extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
+extern const SettingsMaxThreads max_threads;
+extern const SettingsBool empty_result_for_aggregation_by_empty_set;
+extern const SettingsUInt64 group_by_two_level_threshold_bytes;
+extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
+extern const SettingsUInt64 max_rows_to_group_by;
+extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
+extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
+extern const SettingsUInt64 group_by_two_level_threshold;
+extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
+extern const SettingsMaxThreads max_threads;
+extern const SettingsUInt64 max_block_size;
 }
 
 template <typename Method>
@@ -39,24 +60,23 @@ static Int32 extractMethodBucketsNum(Method & /*method*/)
 Int32 GlutenAggregatorUtil::getBucketsNum(AggregatedDataVariants & 
data_variants)
 {
     if (!data_variants.isTwoLevel())
-    {
         return 0;
-    }
-    
+
     Int32 buckets_num = 0;
 #define M(NAME) \
-    else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
-        buckets_num = extractMethodBucketsNum(*data_variants.NAME);
+    else if (data_variants.type == AggregatedDataVariants::Type::NAME) 
buckets_num = extractMethodBucketsNum(*data_variants.NAME);
 
-    if (false) {} // NOLINT
+    if (false)
+    {
+    } // NOLINT
     APPLY_FOR_VARIANTS_TWO_LEVEL(M)
 #undef M
-    else
-        throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown 
aggregated data variant");
+    else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown 
aggregated data variant");
     return buckets_num;
 }
 
-std::optional<Block> 
GlutenAggregatorUtil::safeConvertOneBucketToBlock(Aggregator & aggregator, 
AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket)
+std::optional<Block> GlutenAggregatorUtil::safeConvertOneBucketToBlock(
+    Aggregator & aggregator, AggregatedDataVariants & variants, Arena * arena, 
bool final, Int32 bucket)
 {
     if (!variants.isTwoLevel())
         return {};
@@ -65,7 +85,7 @@ std::optional<Block> 
GlutenAggregatorUtil::safeConvertOneBucketToBlock(Aggregato
     return aggregator.convertOneBucketToBlock(variants, arena, final, bucket);
 }
 
-template<typename Method>
+template <typename Method>
 static void releaseOneBucket(Method & method, Int32 bucket)
 {
     method.data.impls[bucket].clearAndShrink();
@@ -77,29 +97,26 @@ void 
GlutenAggregatorUtil::safeReleaseOneBucket(AggregatedDataVariants & variant
         return;
     if (bucket >= getBucketsNum(variants))
         return;
-#define M(NAME) \
-    else if (variants.type == AggregatedDataVariants::Type::NAME) \
-        releaseOneBucket(*variants.NAME, bucket);
+#define M(NAME) else if (variants.type == AggregatedDataVariants::Type::NAME) 
releaseOneBucket(*variants.NAME, bucket);
 
-    if (false) {} // NOLINT
+    if (false)
+    {
+    } // NOLINT
     APPLY_FOR_VARIANTS_TWO_LEVEL(M)
 #undef M
-    else
-        throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown 
aggregated data variant");
-
+    else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown 
aggregated data variant");
 }
 
 }
 
 namespace local_engine
 {
-AggregateDataBlockConverter::AggregateDataBlockConverter(DB::Aggregator & 
aggregator_, DB::AggregatedDataVariantsPtr data_variants_, bool final_)
+AggregateDataBlockConverter::AggregateDataBlockConverter(
+    DB::Aggregator & aggregator_, DB::AggregatedDataVariantsPtr 
data_variants_, bool final_)
     : aggregator(aggregator_), data_variants(std::move(data_variants_)), 
final(final_)
 {
     if (data_variants->isTwoLevel())
-    {
         buckets_num = DB::GlutenAggregatorUtil::getBucketsNum(*data_variants);
-    }
     else if (data_variants->size())
         buckets_num = 1;
     else
@@ -168,4 +185,94 @@ DB::Block AggregateDataBlockConverter::next()
     output_blocks.pop_front();
     return block;
 }
+
+DB::Aggregator::Params AggregatorParamsHelper::buildParams(
+    DB::ContextPtr context,
+    const DB::Names & grouping_keys,
+    const DB::AggregateDescriptions & agg_descriptions,
+    Mode mode,
+    Algorithm algorithm)
+{
+    const auto & settings = context->getSettingsRef();
+    size_t max_rows_to_group_by = mode == Mode::PARTIAL_TO_FINISHED ? 0 : 
static_cast<size_t>(settings[DB::Setting::max_rows_to_group_by]);
+    DB::OverflowMode group_by_overflow_mode = 
settings[DB::Setting::group_by_overflow_mode];
+    size_t group_by_two_level_threshold
+        = algorithm == Algorithm::GlutenGraceAggregate ? 
static_cast<size_t>(settings[DB::Setting::group_by_two_level_threshold]) : 0;
+    size_t group_by_two_level_threshold_bytes = algorithm == 
Algorithm::GlutenGraceAggregate
+        ? 0
+        : (mode == Mode::PARTIAL_TO_FINISHED ? 0 : 
static_cast<size_t>(settings[DB::Setting::group_by_two_level_threshold_bytes]));
+    size_t max_bytes_before_external_group_by = algorithm == 
Algorithm::GlutenGraceAggregate
+        ? 0
+        : (mode == Mode::PARTIAL_TO_FINISHED ? 0 : 
static_cast<size_t>(settings[DB::Setting::max_bytes_before_external_group_by]));
+    bool empty_result_for_aggregation_by_empty_set = algorithm == 
Algorithm::GlutenGraceAggregate
+        ? false
+        : (mode == Mode::PARTIAL_TO_FINISHED ? false : 
static_cast<bool>(settings[DB::Setting::empty_result_for_aggregation_by_empty_set]));
+    DB::TemporaryDataOnDiskScopePtr tmp_data_scope = algorithm == 
Algorithm::GlutenGraceAggregate ? nullptr : context->getTempDataOnDisk();
+    size_t max_threads = settings[DB::Setting::max_threads];
+    size_t min_free_disk_space = algorithm == Algorithm::GlutenGraceAggregate
+        ? 0
+        : 
static_cast<size_t>(settings[DB::Setting::min_free_disk_space_for_temporary_data]);
+    bool compile_aggregate_expressions = mode == Mode::PARTIAL_TO_FINISHED ? 
false : true;
+    size_t min_count_to_compile_aggregate_expression = mode == 
Mode::PARTIAL_TO_FINISHED ? 0 : 3;
+    size_t max_block_size = 
PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]);
+    bool enable_prefetch = mode == Mode::PARTIAL_TO_FINISHED ? false : true;
+    bool only_merge = mode == Mode::PARTIAL_TO_FINISHED;
+    bool optimize_group_by_constant_keys
+        = mode == Mode::PARTIAL_TO_FINISHED ? false : 
settings[DB::Setting::optimize_group_by_constant_keys];
+    double min_hit_rate_to_use_consecutive_keys_optimization = 
settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization];
+    DB::Aggregator::Params params(
+        grouping_keys,
+        agg_descriptions,
+        false,
+        max_rows_to_group_by,
+        group_by_overflow_mode,
+        group_by_two_level_threshold,
+        group_by_two_level_threshold_bytes,
+        max_bytes_before_external_group_by,
+        empty_result_for_aggregation_by_empty_set,
+        tmp_data_scope,
+        max_threads,
+        min_free_disk_space,
+        compile_aggregate_expressions,
+        min_count_to_compile_aggregate_expression,
+        max_block_size,
+        enable_prefetch,
+        only_merge,
+        optimize_group_by_constant_keys,
+        min_hit_rate_to_use_consecutive_keys_optimization,
+        {});
+    return params;
+}
+
+
+#define COMPARE_FIELD(field) \
+    if (lhs.field != rhs.field) \
+    { \
+        LOG_ERROR(getLogger("AggregatorParamsHelper"), "Aggregator::Params 
field " #field " is not equal. {}/{}", lhs.field, rhs.field); \
+        return false; \
+    }
+bool AggregatorParamsHelper::compare(const DB::Aggregator::Params & lhs, const 
DB::Aggregator::Params & rhs)
+{
+    COMPARE_FIELD(overflow_row);
+    COMPARE_FIELD(max_rows_to_group_by);
+    COMPARE_FIELD(group_by_overflow_mode);
+    COMPARE_FIELD(group_by_two_level_threshold);
+    COMPARE_FIELD(group_by_two_level_threshold_bytes);
+    COMPARE_FIELD(max_bytes_before_external_group_by);
+    COMPARE_FIELD(empty_result_for_aggregation_by_empty_set);
+    COMPARE_FIELD(max_threads);
+    COMPARE_FIELD(min_free_disk_space);
+    COMPARE_FIELD(compile_aggregate_expressions);
+    if ((lhs.tmp_data_scope == nullptr) != (rhs.tmp_data_scope == nullptr))
+    {
+        LOG_ERROR(getLogger("AggregatorParamsHelper"), "Aggregator::Params 
field tmp_data_scope is not equal.");
+        return false;
+    }
+    COMPARE_FIELD(min_count_to_compile_aggregate_expression);
+    COMPARE_FIELD(enable_prefetch);
+    COMPARE_FIELD(only_merge);
+    COMPARE_FIELD(optimize_group_by_constant_keys);
+    COMPARE_FIELD(min_hit_rate_to_use_consecutive_keys_optimization);
+    return true;
+}
 }
diff --git a/cpp-ch/local-engine/Common/AggregateUtil.h 
b/cpp-ch/local-engine/Common/AggregateUtil.h
index b14cd59c54..380e1ea355 100644
--- a/cpp-ch/local-engine/Common/AggregateUtil.h
+++ b/cpp-ch/local-engine/Common/AggregateUtil.h
@@ -25,7 +25,8 @@ class GlutenAggregatorUtil
 {
 public:
     static Int32 getBucketsNum(AggregatedDataVariants & data_variants);
-    static std::optional<Block> safeConvertOneBucketToBlock(Aggregator & 
aggregator, AggregatedDataVariants & variants, Arena * arena, bool final, Int32 
bucket);
+    static std::optional<Block>
+    safeConvertOneBucketToBlock(Aggregator & aggregator, 
AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket);
     static void safeReleaseOneBucket(AggregatedDataVariants & variants, Int32 
bucket);
 };
 }
@@ -41,6 +42,7 @@ public:
     ~AggregateDataBlockConverter() = default;
     bool hasNext();
     DB::Block next();
+
 private:
     DB::Aggregator & aggregator;
     DB::AggregatedDataVariantsPtr data_variants;
@@ -50,4 +52,31 @@ private:
     Int32 current_bucket = 0;
     DB::BlocksList output_blocks;
 };
+
+class AggregatorParamsHelper
+{
+public:
+    enum class Algorithm
+    {
+        GlutenGraceAggregate,
+        CHTwoStageAggregate
+    };
+    enum class Mode
+    {
+        INIT_TO_PARTIAL,
+        INIT_TO_COMPLETED,
+        PARTIAL_TO_PARTIAL,
+        PARTIAL_TO_FINISHED,
+    };
+
+    // for using grace aggregating, never enable ch spill, otherwise there 
will be data lost.
+    static DB::Aggregator::Params buildParams(
+        DB::ContextPtr context,
+        const DB::Names & grouping_keys,
+        const DB::AggregateDescriptions & agg_descriptions,
+        Mode mode,
+        Algorithm algorithm = Algorithm::GlutenGraceAggregate);
+    static bool compare(const DB::Aggregator::Params & lhs, const 
DB::Aggregator::Params & rhs);
+};
+
 }
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp 
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
index b777731a91..ee767b31bd 100644
--- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
@@ -16,9 +16,6 @@
  */
 
 #include "AdvancedExpandStep.h"
-#include <iterator>
-#include <system_error>
-#include <unordered_set>
 #include <Columns/ColumnNullable.h>
 #include <Columns/ColumnsNumber.h>
 #include <Columns/IColumn.h>
@@ -33,33 +30,13 @@
 #include <Processors/Transforms/ExpressionTransform.h>
 #include <QueryPipeline/Pipe.h>
 #include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Common/AggregateUtil.h>
 #include <Common/CHUtil.h>
 #include <Common/WeakHash.h>
 
 #include <Poco/Logger.h>
 #include <Common/logger_useful.h>
 
-namespace DB
-{
-namespace Setting
-{
-extern const SettingsUInt64 max_bytes_before_external_group_by;
-extern const SettingsBool optimize_group_by_constant_keys;
-extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
-extern const SettingsMaxThreads max_threads;
-extern const SettingsBool empty_result_for_aggregation_by_empty_set;
-extern const SettingsUInt64 group_by_two_level_threshold_bytes;
-extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
-extern const SettingsUInt64 max_rows_to_group_by;
-extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
-extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
-extern const SettingsUInt64 group_by_two_level_threshold;
-extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
-extern const SettingsMaxThreads max_threads;
-extern const SettingsUInt64 max_block_size;
-}
-}
-
 namespace local_engine
 {
 
@@ -116,27 +93,8 @@ void 
AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
         aggregate_grouping_keys.push_back(col.name);
     }
     // partial to partial aggregate
-    DB::Aggregator::Params params(
-        aggregate_grouping_keys,
-        aggregate_descriptions,
-        false,
-        settings[DB::Setting::max_rows_to_group_by],
-        settings[DB::Setting::group_by_overflow_mode],
-        settings[DB::Setting::group_by_two_level_threshold],
-        0,
-        0,
-        settings[DB::Setting::empty_result_for_aggregation_by_empty_set],
-        nullptr,
-        settings[DB::Setting::max_threads],
-        settings[DB::Setting::min_free_disk_space_for_temporary_data],
-        true,
-        3,
-        
PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]),
-        /*enable_prefetch*/ true,
-        /*only_merge*/ false,
-        settings[DB::Setting::optimize_group_by_constant_keys],
-        
settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization],
-        /*StatsCollectingParams*/ {});
+    auto params = AggregatorParamsHelper::buildParams(
+        context, aggregate_grouping_keys, aggregate_descriptions, 
AggregatorParamsHelper::Mode::PARTIAL_TO_PARTIAL);
 
     auto input_header = input_headers.front();
     auto build_transform = [&](DB::OutputPortRawPtrs outputs)
diff --git a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
index 6bc8c7e6e1..f5498b01ab 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
@@ -39,19 +39,8 @@ namespace DB
 {
 namespace Setting
 {
-extern const SettingsUInt64 max_bytes_before_external_group_by;
-extern const SettingsBool optimize_group_by_constant_keys;
-extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
-extern const SettingsMaxThreads max_threads;
-extern const SettingsBool empty_result_for_aggregation_by_empty_set;
-extern const SettingsUInt64 group_by_two_level_threshold_bytes;
-extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
-extern const SettingsUInt64 max_rows_to_group_by;
 extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
 extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
-extern const SettingsUInt64 group_by_two_level_threshold;
-extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
-extern const SettingsMaxThreads max_threads;
 extern const SettingsUInt64 max_block_size;
 }
 namespace ErrorCodes
@@ -184,9 +173,7 @@ void AggregateRelParser::setup(DB::QueryPlanPtr query_plan, 
const substrait::Rel
         agg_info.signature_function_name = 
*parseSignatureFunctionName(measure.measure().function_reference());
         auto function_parser = 
AggregateFunctionParserFactory::instance().get(agg_info.signature_function_name,
 parser_context);
         if (!function_parser)
-        {
             throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported 
aggregate function: {}", agg_info.signature_function_name);
-        }
         /// Put function_parser, parser_func_info and function_name into 
agg_info for reducing repeated builds.
         agg_info.function_parser = function_parser;
         agg_info.parser_func_info = 
AggregateFunctionParser::CommonFunctionInfo(measure);
@@ -198,16 +185,10 @@ void AggregateRelParser::setup(DB::QueryPlanPtr 
query_plan, const substrait::Rel
     if (aggregate_rel->groupings_size() == 1)
     {
         for (const auto & expr : 
aggregate_rel->groupings(0).grouping_expressions())
-        {
             if (expr.has_selection() && 
expr.selection().has_direct_reference())
-            {
                 
grouping_keys.push_back(input_header.getByPosition(expr.selection().direct_reference().struct_field().field()).name);
-            }
             else
-            {
                 throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported group 
expression: {}", expr.DebugString());
-            }
-        }
     }
     else if (aggregate_rel->groupings_size() != 0)
     {
@@ -345,23 +326,23 @@ void AggregateRelParser::addMergingAggregatedStep()
     AggregateDescriptions aggregate_descriptions;
     buildAggregateDescriptions(aggregate_descriptions);
     const auto & settings = getContext()->getSettingsRef();
-    Aggregator::Params params(
-        grouping_keys,
-        aggregate_descriptions,
-        false,
-        settings[Setting::max_threads],
-        
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
-        settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization]);
     auto config = StreamingAggregateConfig::loadFromContext(getContext());
     if (config.enable_streaming_aggregating)
     {
-        params.group_by_two_level_threshold = 
settings[Setting::group_by_two_level_threshold];
+        auto params = AggregatorParamsHelper::buildParams(
+            getContext(), grouping_keys, aggregate_descriptions, 
AggregatorParamsHelper::Mode::PARTIAL_TO_FINISHED);
         auto merging_step = 
std::make_unique<GraceMergingAggregatedStep>(getContext(), 
plan->getCurrentHeader(), params, false);
         steps.emplace_back(merging_step.get());
         plan->addStep(std::move(merging_step));
     }
     else
     {
+        auto params = AggregatorParamsHelper::buildParams(
+            getContext(),
+            grouping_keys,
+            aggregate_descriptions,
+            AggregatorParamsHelper::Mode::PARTIAL_TO_FINISHED,
+            AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
         /// We don't use the grouping set feature in CH, so 
grouping_sets_params_list should always be empty.
         DB::GroupingSetsParamsList grouping_sets_params_list;
         auto merging_step = std::make_unique<DB::MergingAggregatedStep>(
@@ -389,54 +370,20 @@ void AggregateRelParser::addCompleteModeAggregatedStep()
     auto config = StreamingAggregateConfig::loadFromContext(getContext());
     if (config.enable_streaming_aggregating)
     {
-        Aggregator::Params params(
-            grouping_keys,
-            aggregate_descriptions,
-            false,
-            settings[Setting::max_rows_to_group_by],
-            settings[Setting::group_by_overflow_mode],
-            settings[Setting::group_by_two_level_threshold],
-            settings[Setting::group_by_two_level_threshold_bytes],
-            0, /*settings[Setting::max_bytes_before_external_group_by]*/
-            settings[Setting::empty_result_for_aggregation_by_empty_set],
-            getContext()->getTempDataOnDisk(),
-            settings[Setting::max_threads],
-            settings[Setting::min_free_disk_space_for_temporary_data],
-            true,
-            3,
-            
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
-            /*enable_prefetch*/ true,
-            /*only_merge*/ false,
-            settings[Setting::optimize_group_by_constant_keys],
-            
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
-            /*StatsCollectingParams*/ {});
+        auto params = AggregatorParamsHelper::buildParams(
+            getContext(), grouping_keys, aggregate_descriptions, 
AggregatorParamsHelper::Mode::INIT_TO_COMPLETED);
         auto merging_step = 
std::make_unique<GraceMergingAggregatedStep>(getContext(), 
plan->getCurrentHeader(), params, true);
         steps.emplace_back(merging_step.get());
         plan->addStep(std::move(merging_step));
     }
     else
     {
-        Aggregator::Params params(
+        auto params = AggregatorParamsHelper::buildParams(
+            getContext(),
             grouping_keys,
             aggregate_descriptions,
-            false,
-            settings[Setting::max_rows_to_group_by],
-            settings[Setting::group_by_overflow_mode],
-            settings[Setting::group_by_two_level_threshold],
-            settings[Setting::group_by_two_level_threshold_bytes],
-            settings[Setting::max_bytes_before_external_group_by],
-            settings[Setting::empty_result_for_aggregation_by_empty_set],
-            getContext()->getTempDataOnDisk(),
-            settings[Setting::max_threads],
-            settings[Setting::min_free_disk_space_for_temporary_data],
-            true,
-            3,
-            
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
-            /*enable_prefetch*/ true,
-            /*only_merge*/ false,
-            settings[Setting::optimize_group_by_constant_keys],
-            
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
-            /*StatsCollectingParams*/ {});
+            AggregatorParamsHelper::Mode::INIT_TO_COMPLETED,
+            AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
 
         auto aggregating_step = std::make_unique<AggregatingStep>(
             plan->getCurrentHeader(),
@@ -471,9 +418,7 @@ void AggregateRelParser::addAggregatingStep()
     {
         const auto & next_rel = *(rel_stack->back());
         if (next_rel.rel_type_case() == 
substrait::Rel::RelTypeCase::kAggregate)
-        {
             is_distinct_aggreate = true;
-        }
     }
 
     if (config.enable_streaming_aggregating)
@@ -484,27 +429,9 @@ void AggregateRelParser::addAggregatingStep()
         // unreliable. It will appear that a small hash table is converted 
into a two level structure, resulting in a
         // lot of small blocks. So we disable this condition, reamain 
`group_by_two_level_threshold` as the condition to
         // convert a single level hash table into a two level one.
-        Aggregator::Params params(
-            grouping_keys,
-            aggregate_descriptions,
-            false,
-            settings[Setting::max_rows_to_group_by],
-            settings[Setting::group_by_overflow_mode],
-            settings[Setting::group_by_two_level_threshold],
-            0, // group_by_two_level_threshold_bytes
-            0,
-            settings[Setting::empty_result_for_aggregation_by_empty_set],
-            nullptr,
-            settings[Setting::max_threads],
-            settings[Setting::min_free_disk_space_for_temporary_data],
-            true,
-            3,
-            
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
-            /*enable_prefetch*/ true,
-            /*only_merge*/ false,
-            settings[Setting::optimize_group_by_constant_keys],
-            
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
-            /*StatsCollectingParams*/ {});
+        auto params = AggregatorParamsHelper::buildParams(
+            getContext(), grouping_keys, aggregate_descriptions, 
AggregatorParamsHelper::Mode::INIT_TO_PARTIAL);
+
         if (!is_distinct_aggreate)
         {
             auto aggregating_step = 
std::make_unique<StreamingAggregatingStep>(getContext(), 
plan->getCurrentHeader(), params);
@@ -532,27 +459,12 @@ void AggregateRelParser::addAggregatingStep()
     }
     else
     {
-        Aggregator::Params params(
+        auto params = AggregatorParamsHelper::buildParams(
+            getContext(),
             grouping_keys,
             aggregate_descriptions,
-            false,
-            settings[Setting::max_rows_to_group_by],
-            settings[Setting::group_by_overflow_mode],
-            settings[Setting::group_by_two_level_threshold],
-            settings[Setting::group_by_two_level_threshold_bytes],
-            settings[Setting::max_bytes_before_external_group_by],
-            settings[Setting::empty_result_for_aggregation_by_empty_set],
-            getContext()->getTempDataOnDisk(),
-            settings[Setting::max_threads],
-            settings[Setting::min_free_disk_space_for_temporary_data],
-            true,
-            3,
-            
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
-            /*enable_prefetch*/ true,
-            /*only_merge*/ false,
-            settings[Setting::optimize_group_by_constant_keys],
-            
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
-            /*StatsCollectingParams*/ {});
+            AggregatorParamsHelper::Mode::INIT_TO_PARTIAL,
+            AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
 
         auto aggregating_step = std::make_unique<AggregatingStep>(
             plan->getCurrentHeader(),
@@ -587,12 +499,8 @@ void AggregateRelParser::addPostProjection()
         for (const auto & agg_info : aggregates)
         {
             for (const auto * input_node : project_actions_dag.getInputs())
-            {
                 if (input_node->result_name == agg_info.measure_column_name)
-                {
                     
agg_info.function_parser->convertNodeTypeIfNeeded(agg_info.parser_func_info, 
input_node, project_actions_dag, false);
-                }
-            }
         }
     }
     else if (has_complete_stage)
@@ -601,12 +509,8 @@ void AggregateRelParser::addPostProjection()
         for (const auto & agg_info : aggregates)
         {
             for (const auto * output_node : project_actions_dag.getOutputs())
-            {
                 if (output_node->result_name == agg_info.measure_column_name)
-                {
                     
agg_info.function_parser->convertNodeTypeIfNeeded(agg_info.parser_func_info, 
output_node, project_actions_dag, true);
-                }
-            }
         }
     }
     if (project_actions_dag.dumpDAG() != dag_footprint)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to