This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c5c064a8ba [GLUTEN-7962][CH] A friendly API to build aggregator params
#7963
c5c064a8ba is described below
commit c5c064a8bae9454c6f009a1087efece9b4631f36
Author: lgbo <[email protected]>
AuthorDate: Mon Nov 18 17:02:24 2024 +0800
[GLUTEN-7962][CH] A friendly API to build aggregator params #7963
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
Fixes: #7962
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration
tests, manual tests)
unit tests
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
---
cpp-ch/local-engine/Common/AggregateUtil.cpp | 151 ++++++++++++++++++---
cpp-ch/local-engine/Common/AggregateUtil.h | 31 ++++-
.../local-engine/Operator/AdvancedExpandStep.cpp | 48 +------
.../Parser/RelParsers/AggregateRelParser.cpp | 138 +++----------------
4 files changed, 183 insertions(+), 185 deletions(-)
diff --git a/cpp-ch/local-engine/Common/AggregateUtil.cpp
b/cpp-ch/local-engine/Common/AggregateUtil.cpp
index 851dd2e7fe..2290747fa1 100644
--- a/cpp-ch/local-engine/Common/AggregateUtil.cpp
+++ b/cpp-ch/local-engine/Common/AggregateUtil.cpp
@@ -15,8 +15,11 @@
* limitations under the License.
*/
+#include "AggregateUtil.h"
+#include <Core/Settings.h>
#include <Poco/Logger.h>
#include <Common/AggregateUtil.h>
+#include <Common/CHUtil.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
@@ -26,8 +29,26 @@ namespace DB
{
namespace ErrorCodes
{
- extern const int LOGICAL_ERROR;
- extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
+extern const int LOGICAL_ERROR;
+extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
+}
+
+namespace Setting
+{
+extern const SettingsUInt64 max_bytes_before_external_group_by;
+extern const SettingsBool optimize_group_by_constant_keys;
+extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
+extern const SettingsMaxThreads max_threads;
+extern const SettingsBool empty_result_for_aggregation_by_empty_set;
+extern const SettingsUInt64 group_by_two_level_threshold_bytes;
+extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
+extern const SettingsUInt64 max_rows_to_group_by;
+extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
+extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
+extern const SettingsUInt64 group_by_two_level_threshold;
+extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
+extern const SettingsMaxThreads max_threads;
+extern const SettingsUInt64 max_block_size;
}
template <typename Method>
@@ -39,24 +60,23 @@ static Int32 extractMethodBucketsNum(Method & /*method*/)
Int32 GlutenAggregatorUtil::getBucketsNum(AggregatedDataVariants &
data_variants)
{
if (!data_variants.isTwoLevel())
- {
return 0;
- }
-
+
Int32 buckets_num = 0;
#define M(NAME) \
- else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
- buckets_num = extractMethodBucketsNum(*data_variants.NAME);
+ else if (data_variants.type == AggregatedDataVariants::Type::NAME)
buckets_num = extractMethodBucketsNum(*data_variants.NAME);
- if (false) {} // NOLINT
+ if (false)
+ {
+ } // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
- else
- throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown
aggregated data variant");
+ else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown
aggregated data variant");
return buckets_num;
}
-std::optional<Block>
GlutenAggregatorUtil::safeConvertOneBucketToBlock(Aggregator & aggregator,
AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket)
+std::optional<Block> GlutenAggregatorUtil::safeConvertOneBucketToBlock(
+ Aggregator & aggregator, AggregatedDataVariants & variants, Arena * arena,
bool final, Int32 bucket)
{
if (!variants.isTwoLevel())
return {};
@@ -65,7 +85,7 @@ std::optional<Block>
GlutenAggregatorUtil::safeConvertOneBucketToBlock(Aggregato
return aggregator.convertOneBucketToBlock(variants, arena, final, bucket);
}
-template<typename Method>
+template <typename Method>
static void releaseOneBucket(Method & method, Int32 bucket)
{
method.data.impls[bucket].clearAndShrink();
@@ -77,29 +97,26 @@ void
GlutenAggregatorUtil::safeReleaseOneBucket(AggregatedDataVariants & variant
return;
if (bucket >= getBucketsNum(variants))
return;
-#define M(NAME) \
- else if (variants.type == AggregatedDataVariants::Type::NAME) \
- releaseOneBucket(*variants.NAME, bucket);
+#define M(NAME) else if (variants.type == AggregatedDataVariants::Type::NAME)
releaseOneBucket(*variants.NAME, bucket);
- if (false) {} // NOLINT
+ if (false)
+ {
+ } // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
- else
- throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown
aggregated data variant");
-
+ else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown
aggregated data variant");
}
}
namespace local_engine
{
-AggregateDataBlockConverter::AggregateDataBlockConverter(DB::Aggregator &
aggregator_, DB::AggregatedDataVariantsPtr data_variants_, bool final_)
+AggregateDataBlockConverter::AggregateDataBlockConverter(
+ DB::Aggregator & aggregator_, DB::AggregatedDataVariantsPtr
data_variants_, bool final_)
: aggregator(aggregator_), data_variants(std::move(data_variants_)),
final(final_)
{
if (data_variants->isTwoLevel())
- {
buckets_num = DB::GlutenAggregatorUtil::getBucketsNum(*data_variants);
- }
else if (data_variants->size())
buckets_num = 1;
else
@@ -168,4 +185,94 @@ DB::Block AggregateDataBlockConverter::next()
output_blocks.pop_front();
return block;
}
+
+DB::Aggregator::Params AggregatorParamsHelper::buildParams(
+ DB::ContextPtr context,
+ const DB::Names & grouping_keys,
+ const DB::AggregateDescriptions & agg_descriptions,
+ Mode mode,
+ Algorithm algorithm)
+{
+ const auto & settings = context->getSettingsRef();
+ size_t max_rows_to_group_by = mode == Mode::PARTIAL_TO_FINISHED ? 0 :
static_cast<size_t>(settings[DB::Setting::max_rows_to_group_by]);
+ DB::OverflowMode group_by_overflow_mode =
settings[DB::Setting::group_by_overflow_mode];
+ size_t group_by_two_level_threshold
+ = algorithm == Algorithm::GlutenGraceAggregate ?
static_cast<size_t>(settings[DB::Setting::group_by_two_level_threshold]) : 0;
+ size_t group_by_two_level_threshold_bytes = algorithm ==
Algorithm::GlutenGraceAggregate
+ ? 0
+ : (mode == Mode::PARTIAL_TO_FINISHED ? 0 :
static_cast<size_t>(settings[DB::Setting::group_by_two_level_threshold_bytes]));
+ size_t max_bytes_before_external_group_by = algorithm ==
Algorithm::GlutenGraceAggregate
+ ? 0
+ : (mode == Mode::PARTIAL_TO_FINISHED ? 0 :
static_cast<size_t>(settings[DB::Setting::max_bytes_before_external_group_by]));
+ bool empty_result_for_aggregation_by_empty_set = algorithm ==
Algorithm::GlutenGraceAggregate
+ ? false
+ : (mode == Mode::PARTIAL_TO_FINISHED ? false :
static_cast<bool>(settings[DB::Setting::empty_result_for_aggregation_by_empty_set]));
+ DB::TemporaryDataOnDiskScopePtr tmp_data_scope = algorithm ==
Algorithm::GlutenGraceAggregate ? nullptr : context->getTempDataOnDisk();
+ size_t max_threads = settings[DB::Setting::max_threads];
+ size_t min_free_disk_space = algorithm == Algorithm::GlutenGraceAggregate
+ ? 0
+ :
static_cast<size_t>(settings[DB::Setting::min_free_disk_space_for_temporary_data]);
+ bool compile_aggregate_expressions = mode == Mode::PARTIAL_TO_FINISHED ?
false : true;
+ size_t min_count_to_compile_aggregate_expression = mode ==
Mode::PARTIAL_TO_FINISHED ? 0 : 3;
+ size_t max_block_size =
PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]);
+ bool enable_prefetch = mode == Mode::PARTIAL_TO_FINISHED ? false : true;
+ bool only_merge = mode == Mode::PARTIAL_TO_FINISHED;
+ bool optimize_group_by_constant_keys
+ = mode == Mode::PARTIAL_TO_FINISHED ? false :
settings[DB::Setting::optimize_group_by_constant_keys];
+ double min_hit_rate_to_use_consecutive_keys_optimization =
settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization];
+ DB::Aggregator::Params params(
+ grouping_keys,
+ agg_descriptions,
+ false,
+ max_rows_to_group_by,
+ group_by_overflow_mode,
+ group_by_two_level_threshold,
+ group_by_two_level_threshold_bytes,
+ max_bytes_before_external_group_by,
+ empty_result_for_aggregation_by_empty_set,
+ tmp_data_scope,
+ max_threads,
+ min_free_disk_space,
+ compile_aggregate_expressions,
+ min_count_to_compile_aggregate_expression,
+ max_block_size,
+ enable_prefetch,
+ only_merge,
+ optimize_group_by_constant_keys,
+ min_hit_rate_to_use_consecutive_keys_optimization,
+ {});
+ return params;
+}
+
+
+#define COMPARE_FIELD(field) \
+ if (lhs.field != rhs.field) \
+ { \
+ LOG_ERROR(getLogger("AggregatorParamsHelper"), "Aggregator::Params
field " #field " is not equal. {}/{}", lhs.field, rhs.field); \
+ return false; \
+ }
+bool AggregatorParamsHelper::compare(const DB::Aggregator::Params & lhs, const
DB::Aggregator::Params & rhs)
+{
+ COMPARE_FIELD(overflow_row);
+ COMPARE_FIELD(max_rows_to_group_by);
+ COMPARE_FIELD(group_by_overflow_mode);
+ COMPARE_FIELD(group_by_two_level_threshold);
+ COMPARE_FIELD(group_by_two_level_threshold_bytes);
+ COMPARE_FIELD(max_bytes_before_external_group_by);
+ COMPARE_FIELD(empty_result_for_aggregation_by_empty_set);
+ COMPARE_FIELD(max_threads);
+ COMPARE_FIELD(min_free_disk_space);
+ COMPARE_FIELD(compile_aggregate_expressions);
+ if ((lhs.tmp_data_scope == nullptr) != (rhs.tmp_data_scope == nullptr))
+ {
+ LOG_ERROR(getLogger("AggregatorParamsHelper"), "Aggregator::Params
field tmp_data_scope is not equal.");
+ return false;
+ }
+ COMPARE_FIELD(min_count_to_compile_aggregate_expression);
+ COMPARE_FIELD(enable_prefetch);
+ COMPARE_FIELD(only_merge);
+ COMPARE_FIELD(optimize_group_by_constant_keys);
+ COMPARE_FIELD(min_hit_rate_to_use_consecutive_keys_optimization);
+ return true;
+}
}
diff --git a/cpp-ch/local-engine/Common/AggregateUtil.h
b/cpp-ch/local-engine/Common/AggregateUtil.h
index b14cd59c54..380e1ea355 100644
--- a/cpp-ch/local-engine/Common/AggregateUtil.h
+++ b/cpp-ch/local-engine/Common/AggregateUtil.h
@@ -25,7 +25,8 @@ class GlutenAggregatorUtil
{
public:
static Int32 getBucketsNum(AggregatedDataVariants & data_variants);
- static std::optional<Block> safeConvertOneBucketToBlock(Aggregator &
aggregator, AggregatedDataVariants & variants, Arena * arena, bool final, Int32
bucket);
+ static std::optional<Block>
+ safeConvertOneBucketToBlock(Aggregator & aggregator,
AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket);
static void safeReleaseOneBucket(AggregatedDataVariants & variants, Int32
bucket);
};
}
@@ -41,6 +42,7 @@ public:
~AggregateDataBlockConverter() = default;
bool hasNext();
DB::Block next();
+
private:
DB::Aggregator & aggregator;
DB::AggregatedDataVariantsPtr data_variants;
@@ -50,4 +52,31 @@ private:
Int32 current_bucket = 0;
DB::BlocksList output_blocks;
};
+
+class AggregatorParamsHelper
+{
+public:
+ enum class Algorithm
+ {
+ GlutenGraceAggregate,
+ CHTwoStageAggregate
+ };
+ enum class Mode
+ {
+ INIT_TO_PARTIAL,
+ INIT_TO_COMPLETED,
+ PARTIAL_TO_PARTIAL,
+ PARTIAL_TO_FINISHED,
+ };
+
+ // for using grace aggregating, never enable ch spill, otherwise there
will be data lost.
+ static DB::Aggregator::Params buildParams(
+ DB::ContextPtr context,
+ const DB::Names & grouping_keys,
+ const DB::AggregateDescriptions & agg_descriptions,
+ Mode mode,
+ Algorithm algorithm = Algorithm::GlutenGraceAggregate);
+ static bool compare(const DB::Aggregator::Params & lhs, const
DB::Aggregator::Params & rhs);
+};
+
}
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
index b777731a91..ee767b31bd 100644
--- a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
@@ -16,9 +16,6 @@
*/
#include "AdvancedExpandStep.h"
-#include <iterator>
-#include <system_error>
-#include <unordered_set>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
@@ -33,33 +30,13 @@
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Common/AggregateUtil.h>
#include <Common/CHUtil.h>
#include <Common/WeakHash.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
-namespace DB
-{
-namespace Setting
-{
-extern const SettingsUInt64 max_bytes_before_external_group_by;
-extern const SettingsBool optimize_group_by_constant_keys;
-extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
-extern const SettingsMaxThreads max_threads;
-extern const SettingsBool empty_result_for_aggregation_by_empty_set;
-extern const SettingsUInt64 group_by_two_level_threshold_bytes;
-extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
-extern const SettingsUInt64 max_rows_to_group_by;
-extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
-extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
-extern const SettingsUInt64 group_by_two_level_threshold;
-extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
-extern const SettingsMaxThreads max_threads;
-extern const SettingsUInt64 max_block_size;
-}
-}
-
namespace local_engine
{
@@ -116,27 +93,8 @@ void
AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline,
aggregate_grouping_keys.push_back(col.name);
}
// partial to partial aggregate
- DB::Aggregator::Params params(
- aggregate_grouping_keys,
- aggregate_descriptions,
- false,
- settings[DB::Setting::max_rows_to_group_by],
- settings[DB::Setting::group_by_overflow_mode],
- settings[DB::Setting::group_by_two_level_threshold],
- 0,
- 0,
- settings[DB::Setting::empty_result_for_aggregation_by_empty_set],
- nullptr,
- settings[DB::Setting::max_threads],
- settings[DB::Setting::min_free_disk_space_for_temporary_data],
- true,
- 3,
-
PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]),
- /*enable_prefetch*/ true,
- /*only_merge*/ false,
- settings[DB::Setting::optimize_group_by_constant_keys],
-
settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization],
- /*StatsCollectingParams*/ {});
+ auto params = AggregatorParamsHelper::buildParams(
+ context, aggregate_grouping_keys, aggregate_descriptions,
AggregatorParamsHelper::Mode::PARTIAL_TO_PARTIAL);
auto input_header = input_headers.front();
auto build_transform = [&](DB::OutputPortRawPtrs outputs)
diff --git a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
index 6bc8c7e6e1..f5498b01ab 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
@@ -39,19 +39,8 @@ namespace DB
{
namespace Setting
{
-extern const SettingsUInt64 max_bytes_before_external_group_by;
-extern const SettingsBool optimize_group_by_constant_keys;
-extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
-extern const SettingsMaxThreads max_threads;
-extern const SettingsBool empty_result_for_aggregation_by_empty_set;
-extern const SettingsUInt64 group_by_two_level_threshold_bytes;
-extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
-extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
-extern const SettingsUInt64 group_by_two_level_threshold;
-extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
-extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 max_block_size;
}
namespace ErrorCodes
@@ -184,9 +173,7 @@ void AggregateRelParser::setup(DB::QueryPlanPtr query_plan,
const substrait::Rel
agg_info.signature_function_name =
*parseSignatureFunctionName(measure.measure().function_reference());
auto function_parser =
AggregateFunctionParserFactory::instance().get(agg_info.signature_function_name,
parser_context);
if (!function_parser)
- {
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported
aggregate function: {}", agg_info.signature_function_name);
- }
/// Put function_parser, parser_func_info and function_name into
agg_info for reducing repeated builds.
agg_info.function_parser = function_parser;
agg_info.parser_func_info =
AggregateFunctionParser::CommonFunctionInfo(measure);
@@ -198,16 +185,10 @@ void AggregateRelParser::setup(DB::QueryPlanPtr
query_plan, const substrait::Rel
if (aggregate_rel->groupings_size() == 1)
{
for (const auto & expr :
aggregate_rel->groupings(0).grouping_expressions())
- {
if (expr.has_selection() &&
expr.selection().has_direct_reference())
- {
grouping_keys.push_back(input_header.getByPosition(expr.selection().direct_reference().struct_field().field()).name);
- }
else
- {
throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported group
expression: {}", expr.DebugString());
- }
- }
}
else if (aggregate_rel->groupings_size() != 0)
{
@@ -345,23 +326,23 @@ void AggregateRelParser::addMergingAggregatedStep()
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
const auto & settings = getContext()->getSettingsRef();
- Aggregator::Params params(
- grouping_keys,
- aggregate_descriptions,
- false,
- settings[Setting::max_threads],
-
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
- settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization]);
auto config = StreamingAggregateConfig::loadFromContext(getContext());
if (config.enable_streaming_aggregating)
{
- params.group_by_two_level_threshold =
settings[Setting::group_by_two_level_threshold];
+ auto params = AggregatorParamsHelper::buildParams(
+ getContext(), grouping_keys, aggregate_descriptions,
AggregatorParamsHelper::Mode::PARTIAL_TO_FINISHED);
auto merging_step =
std::make_unique<GraceMergingAggregatedStep>(getContext(),
plan->getCurrentHeader(), params, false);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));
}
else
{
+ auto params = AggregatorParamsHelper::buildParams(
+ getContext(),
+ grouping_keys,
+ aggregate_descriptions,
+ AggregatorParamsHelper::Mode::PARTIAL_TO_FINISHED,
+ AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
/// We don't use the grouping set feature in CH, so
grouping_sets_params_list should always be empty.
DB::GroupingSetsParamsList grouping_sets_params_list;
auto merging_step = std::make_unique<DB::MergingAggregatedStep>(
@@ -389,54 +370,20 @@ void AggregateRelParser::addCompleteModeAggregatedStep()
auto config = StreamingAggregateConfig::loadFromContext(getContext());
if (config.enable_streaming_aggregating)
{
- Aggregator::Params params(
- grouping_keys,
- aggregate_descriptions,
- false,
- settings[Setting::max_rows_to_group_by],
- settings[Setting::group_by_overflow_mode],
- settings[Setting::group_by_two_level_threshold],
- settings[Setting::group_by_two_level_threshold_bytes],
- 0, /*settings[Setting::max_bytes_before_external_group_by]*/
- settings[Setting::empty_result_for_aggregation_by_empty_set],
- getContext()->getTempDataOnDisk(),
- settings[Setting::max_threads],
- settings[Setting::min_free_disk_space_for_temporary_data],
- true,
- 3,
-
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
- /*enable_prefetch*/ true,
- /*only_merge*/ false,
- settings[Setting::optimize_group_by_constant_keys],
-
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
- /*StatsCollectingParams*/ {});
+ auto params = AggregatorParamsHelper::buildParams(
+ getContext(), grouping_keys, aggregate_descriptions,
AggregatorParamsHelper::Mode::INIT_TO_COMPLETED);
auto merging_step =
std::make_unique<GraceMergingAggregatedStep>(getContext(),
plan->getCurrentHeader(), params, true);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));
}
else
{
- Aggregator::Params params(
+ auto params = AggregatorParamsHelper::buildParams(
+ getContext(),
grouping_keys,
aggregate_descriptions,
- false,
- settings[Setting::max_rows_to_group_by],
- settings[Setting::group_by_overflow_mode],
- settings[Setting::group_by_two_level_threshold],
- settings[Setting::group_by_two_level_threshold_bytes],
- settings[Setting::max_bytes_before_external_group_by],
- settings[Setting::empty_result_for_aggregation_by_empty_set],
- getContext()->getTempDataOnDisk(),
- settings[Setting::max_threads],
- settings[Setting::min_free_disk_space_for_temporary_data],
- true,
- 3,
-
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
- /*enable_prefetch*/ true,
- /*only_merge*/ false,
- settings[Setting::optimize_group_by_constant_keys],
-
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
- /*StatsCollectingParams*/ {});
+ AggregatorParamsHelper::Mode::INIT_TO_COMPLETED,
+ AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
auto aggregating_step = std::make_unique<AggregatingStep>(
plan->getCurrentHeader(),
@@ -471,9 +418,7 @@ void AggregateRelParser::addAggregatingStep()
{
const auto & next_rel = *(rel_stack->back());
if (next_rel.rel_type_case() ==
substrait::Rel::RelTypeCase::kAggregate)
- {
is_distinct_aggreate = true;
- }
}
if (config.enable_streaming_aggregating)
@@ -484,27 +429,9 @@ void AggregateRelParser::addAggregatingStep()
// unreliable. It will appear that a small hash table is converted
into a two level structure, resulting in a
// lot of small blocks. So we disable this condition, reamain
`group_by_two_level_threshold` as the condition to
// convert a single level hash table into a two level one.
- Aggregator::Params params(
- grouping_keys,
- aggregate_descriptions,
- false,
- settings[Setting::max_rows_to_group_by],
- settings[Setting::group_by_overflow_mode],
- settings[Setting::group_by_two_level_threshold],
- 0, // group_by_two_level_threshold_bytes
- 0,
- settings[Setting::empty_result_for_aggregation_by_empty_set],
- nullptr,
- settings[Setting::max_threads],
- settings[Setting::min_free_disk_space_for_temporary_data],
- true,
- 3,
-
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
- /*enable_prefetch*/ true,
- /*only_merge*/ false,
- settings[Setting::optimize_group_by_constant_keys],
-
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
- /*StatsCollectingParams*/ {});
+ auto params = AggregatorParamsHelper::buildParams(
+ getContext(), grouping_keys, aggregate_descriptions,
AggregatorParamsHelper::Mode::INIT_TO_PARTIAL);
+
if (!is_distinct_aggreate)
{
auto aggregating_step =
std::make_unique<StreamingAggregatingStep>(getContext(),
plan->getCurrentHeader(), params);
@@ -532,27 +459,12 @@ void AggregateRelParser::addAggregatingStep()
}
else
{
- Aggregator::Params params(
+ auto params = AggregatorParamsHelper::buildParams(
+ getContext(),
grouping_keys,
aggregate_descriptions,
- false,
- settings[Setting::max_rows_to_group_by],
- settings[Setting::group_by_overflow_mode],
- settings[Setting::group_by_two_level_threshold],
- settings[Setting::group_by_two_level_threshold_bytes],
- settings[Setting::max_bytes_before_external_group_by],
- settings[Setting::empty_result_for_aggregation_by_empty_set],
- getContext()->getTempDataOnDisk(),
- settings[Setting::max_threads],
- settings[Setting::min_free_disk_space_for_temporary_data],
- true,
- 3,
-
PODArrayUtil::adjustMemoryEfficientSize(settings[Setting::max_block_size]),
- /*enable_prefetch*/ true,
- /*only_merge*/ false,
- settings[Setting::optimize_group_by_constant_keys],
-
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
- /*StatsCollectingParams*/ {});
+ AggregatorParamsHelper::Mode::INIT_TO_PARTIAL,
+ AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
auto aggregating_step = std::make_unique<AggregatingStep>(
plan->getCurrentHeader(),
@@ -587,12 +499,8 @@ void AggregateRelParser::addPostProjection()
for (const auto & agg_info : aggregates)
{
for (const auto * input_node : project_actions_dag.getInputs())
- {
if (input_node->result_name == agg_info.measure_column_name)
- {
agg_info.function_parser->convertNodeTypeIfNeeded(agg_info.parser_func_info,
input_node, project_actions_dag, false);
- }
- }
}
}
else if (has_complete_stage)
@@ -601,12 +509,8 @@ void AggregateRelParser::addPostProjection()
for (const auto & agg_info : aggregates)
{
for (const auto * output_node : project_actions_dag.getOutputs())
- {
if (output_node->result_name == agg_info.measure_column_name)
- {
agg_info.function_parser->convertNodeTypeIfNeeded(agg_info.parser_func_info,
output_node, project_actions_dag, true);
- }
- }
}
}
if (project_actions_dag.dumpDAG() != dag_footprint)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]