This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1c6246f7ee [improve](agg) support distinct agg node (#22169)
1c6246f7ee is described below
commit 1c6246f7ee922c884d32d5908664b8bb9a0e029e
Author: zhangstar333 <[email protected]>
AuthorDate: Fri Jul 28 13:54:10 2023 +0800
[improve](agg) support distinct agg node (#22169)
select c_name from customer union select c_name from customer
this sql used agg node to get distinct row of c_name,
so it's no need to wait for inserted all data to hash map,
could output the data which it's inserted into hash map successed.
---
be/src/exec/exec_node.cpp | 7 +-
...istinct_streaming_aggregation_sink_operator.cpp | 97 +++++++++++++++
.../distinct_streaming_aggregation_sink_operator.h | 76 ++++++++++++
...tinct_streaming_aggregation_source_operator.cpp | 92 ++++++++++++++
...istinct_streaming_aggregation_source_operator.h | 67 ++++++++++
be/src/pipeline/pipeline_fragment_context.cpp | 17 ++-
be/src/vec/exec/distinct_vaggregation_node.cpp | 136 +++++++++++++++++++++
be/src/vec/exec/distinct_vaggregation_node.h | 55 +++++++++
be/src/vec/exec/vaggregation_node.cpp | 25 ++--
be/src/vec/exec/vaggregation_node.h | 49 ++++----
.../expressions/functions/agg/WindowFunnel.java | 8 +-
11 files changed, 588 insertions(+), 41 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index c6b7826deb..bcb0771eda 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -44,6 +44,7 @@
#include "util/uid_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/join/vnested_loop_join_node.h"
#include "vec/exec/scan/new_es_scan_node.h"
@@ -371,7 +372,11 @@ Status ExecNode::create_node(RuntimeState* state,
ObjectPool* pool, const TPlanN
return Status::OK();
case TPlanNodeType::AGGREGATION_NODE:
- *node = pool->add(new vectorized::AggregationNode(pool, tnode, descs));
+ if (tnode.agg_node.aggregate_functions.empty() &&
state->enable_pipeline_exec()) {
+ *node = pool->add(new vectorized::DistinctAggregationNode(pool,
tnode, descs));
+ } else {
+ *node = pool->add(new vectorized::AggregationNode(pool, tnode,
descs));
+ }
return Status::OK();
case TPlanNodeType::HASH_JOIN_NODE:
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
new file mode 100644
index 0000000000..48695ed56f
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_sink_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/operator.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggSinkOperator::DistinctStreamingAggSinkOperator(
+ OperatorBuilderBase* operator_builder, ExecNode* agg_node,
std::shared_ptr<DataQueue> queue)
+ : StreamingOperator(operator_builder, agg_node),
_data_queue(std::move(queue)) {}
+
+bool DistinctStreamingAggSinkOperator::can_write() {
+ // sink and source in diff threads
+ return _data_queue->has_enough_space_to_push();
+}
+
+Status DistinctStreamingAggSinkOperator::sink(RuntimeState* state,
vectorized::Block* in_block,
+ SourceState source_state) {
+ if (in_block && in_block->rows() > 0) {
+ if (_output_block == nullptr) {
+ _output_block = _data_queue->get_free_block();
+ }
+ RETURN_IF_ERROR(
+ _node->_distinct_pre_agg_with_serialized_key(in_block,
_output_block.get()));
+
+ // get enough data or reached limit rows, need push block to queue
+ if (_node->limit() != -1 &&
+ (_output_block->rows() + _output_distinct_rows) >= _node->limit())
{
+ auto limit_rows = _node->limit() - _output_block->rows();
+ _output_block->set_num_rows(limit_rows);
+ _output_distinct_rows += limit_rows;
+ _data_queue->push_block(std::move(_output_block));
+ } else if (_output_block->rows() >= state->batch_size()) {
+ _output_distinct_rows += _output_block->rows();
+ _data_queue->push_block(std::move(_output_block));
+ }
+ }
+
+ // reach limit or source finish
+ if ((UNLIKELY(source_state == SourceState::FINISHED)) ||
reached_limited_rows()) {
+ if (_output_block != nullptr) { //maybe the last block with eos
+ _output_distinct_rows += _output_block->rows();
+ _data_queue->push_block(std::move(_output_block));
+ }
+ _data_queue->set_finish();
+ return Status::Error<ErrorCode::END_OF_FILE>("");
+ }
+ return Status::OK();
+}
+
+Status DistinctStreamingAggSinkOperator::close(RuntimeState* state) {
+ if (_data_queue && !_data_queue->is_finish()) {
+ // finish should be set, if not set here means error.
+ _data_queue->set_canceled();
+ }
+ return StreamingOperator::close(state);
+}
+
+DistinctStreamingAggSinkOperatorBuilder::DistinctStreamingAggSinkOperatorBuilder(
+ int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue)
+ : OperatorBuilder(id, "DistinctStreamingAggSinkOperator", exec_node),
+ _data_queue(std::move(queue)) {}
+
+OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() {
+ return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node,
_data_queue);
+}
+
+} // namespace doris::pipeline
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
new file mode 100644
index 0000000000..ae7106178e
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class DistinctStreamingAggSinkOperatorBuilder final
+ : public OperatorBuilder<vectorized::DistinctAggregationNode> {
+public:
+ DistinctStreamingAggSinkOperatorBuilder(int32_t, ExecNode*,
std::shared_ptr<DataQueue>);
+
+ OperatorPtr build_operator() override;
+
+ bool is_sink() const override { return true; }
+ bool is_source() const override { return false; }
+
+private:
+ std::shared_ptr<DataQueue> _data_queue;
+};
+
+class DistinctStreamingAggSinkOperator final
+ : public StreamingOperator<DistinctStreamingAggSinkOperatorBuilder> {
+public:
+ DistinctStreamingAggSinkOperator(OperatorBuilderBase* operator_builder,
ExecNode*,
+ std::shared_ptr<DataQueue>);
+
+ Status sink(RuntimeState* state, vectorized::Block* block, SourceState
source_state) override;
+
+ bool can_write() override;
+
+ Status close(RuntimeState* state) override;
+
+ bool reached_limited_rows() {
+ return _node->limit() != -1 && _output_distinct_rows > _node->limit();
+ }
+
+private:
+ int64_t _output_distinct_rows = 0;
+ std::shared_ptr<DataQueue> _data_queue;
+ std::unique_ptr<vectorized::Block> _output_block =
vectorized::Block::create_unique();
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
new file mode 100644
index 0000000000..f91fd3fbe3
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_source_operator.h"
+
+#include <utility>
+
+#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/operator.h"
+#include "runtime/descriptors.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace pipeline {
+DistinctStreamingAggSourceOperator::DistinctStreamingAggSourceOperator(
+ OperatorBuilderBase* templ, ExecNode* node, std::shared_ptr<DataQueue>
queue)
+ : SourceOperator(templ, node), _data_queue(std::move(queue)) {}
+
+bool DistinctStreamingAggSourceOperator::can_read() {
+ return _data_queue->has_data_or_finished();
+}
+
+Status DistinctStreamingAggSourceOperator::pull_data(RuntimeState* state,
vectorized::Block* block,
+ bool* eos) {
+ std::unique_ptr<vectorized::Block> agg_block;
+ RETURN_IF_ERROR(_data_queue->get_block_from_queue(&agg_block));
+ if (agg_block != nullptr) {
+ block->swap(*agg_block);
+
agg_block->clear_column_data(_node->row_desc().num_materialized_slots());
+ _data_queue->push_free_block(std::move(agg_block));
+ }
+ if (_data_queue->data_exhausted()) { //the sink is eos or reached limit
+ *eos = true;
+ }
+ _node->_make_nullable_output_key(block);
+ if (_node->is_streaming_preagg() == false) {
+ // dispose the having clause, should not be execute in prestreaming agg
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_node->get_conjuncts(),
block,
+
block->columns()));
+ }
+
+ rows_have_returned += block->rows();
+ return Status::OK();
+}
+
+Status DistinctStreamingAggSourceOperator::get_block(RuntimeState* state,
vectorized::Block* block,
+ SourceState&
source_state) {
+ bool eos = false;
+ RETURN_IF_ERROR(_node->get_next_after_projects(
+ state, block, &eos,
+ std::bind(&DistinctStreamingAggSourceOperator::pull_data, this,
std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3)));
+ if (UNLIKELY(eos)) {
+ _node->set_num_rows_returned(rows_have_returned);
+ source_state = SourceState::FINISHED;
+ } else {
+ source_state = SourceState::DEPEND_ON_SOURCE;
+ }
+ return Status::OK();
+}
+
+DistinctStreamingAggSourceOperatorBuilder::DistinctStreamingAggSourceOperatorBuilder(
+ int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue)
+ : OperatorBuilder(id, "DistinctStreamingAggSourceOperator", exec_node),
+ _data_queue(std::move(queue)) {}
+
+OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() {
+ return std::make_shared<DistinctStreamingAggSourceOperator>(this, _node,
_data_queue);
+}
+
+} // namespace pipeline
+} // namespace doris
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
new file mode 100644
index 0000000000..3534193bf8
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <stdint.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+namespace pipeline {
+class DataQueue;
+
+class DistinctStreamingAggSourceOperatorBuilder final
+ : public OperatorBuilder<vectorized::DistinctAggregationNode> {
+public:
+ DistinctStreamingAggSourceOperatorBuilder(int32_t, ExecNode*,
std::shared_ptr<DataQueue>);
+
+ bool is_source() const override { return true; }
+
+ OperatorPtr build_operator() override;
+
+private:
+ std::shared_ptr<DataQueue> _data_queue;
+};
+
+class DistinctStreamingAggSourceOperator final
+ : public SourceOperator<DistinctStreamingAggSourceOperatorBuilder> {
+public:
+ DistinctStreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*,
std::shared_ptr<DataQueue>);
+ bool can_read() override;
+ Status get_block(RuntimeState*, vectorized::Block*, SourceState&
source_state) override;
+ Status open(RuntimeState*) override { return Status::OK(); }
+ Status pull_data(RuntimeState* state, vectorized::Block* output_block,
bool* eos);
+
+private:
+ int64_t rows_have_returned = 0;
+ std::shared_ptr<DataQueue> _data_queue;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 3c4957ad40..fe5e98dde8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -48,6 +48,8 @@
#include "pipeline/exec/const_value_operator.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/datagen_operator.h"
+#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
+#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/empty_source_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
@@ -503,10 +505,21 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
- auto* agg_node = assert_cast<vectorized::AggregationNode*>(node);
+ auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
auto new_pipe = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
- if (agg_node->is_streaming_preagg()) {
+ if (agg_node->is_aggregate_evaluators_empty()) {
+ auto data_queue = std::make_shared<DataQueue>(1);
+ OperatorBuilderPtr pre_agg_sink =
+
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
+
data_queue);
+ RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
+
+ OperatorBuilderPtr pre_agg_source =
+
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
+ node->id(), agg_node, data_queue);
+ RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
+ } else if (agg_node->is_streaming_preagg()) {
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp
b/be/src/vec/exec/distinct_vaggregation_node.cpp
new file mode 100644
index 0000000000..bbbd196411
--- /dev/null
+++ b/be/src/vec/exec/distinct_vaggregation_node.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/distinct_vaggregation_node.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/aggregate_functions/aggregate_function_uniq.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ObjectPool;
+} // namespace doris
+
+namespace doris::vectorized {
+
+DistinctAggregationNode::DistinctAggregationNode(ObjectPool* pool, const
TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : AggregationNode(pool, tnode, descs) {
+ dummy_mapped_data = pool->add(new char('A'));
+}
+
+Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
+ doris::vectorized::Block* in_block, doris::vectorized::Block*
out_block) {
+ SCOPED_TIMER(_build_timer);
+ DCHECK(!_probe_expr_ctxs.empty());
+
+ size_t key_size = _probe_expr_ctxs.size();
+ ColumnRawPtrs key_columns(key_size);
+ {
+ SCOPED_TIMER(_expr_timer);
+ for (size_t i = 0; i < key_size; ++i) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block,
&result_column_id));
+ in_block->get_by_position(result_column_id).column =
+ in_block->get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+ key_columns[i] =
in_block->get_by_position(result_column_id).column.get();
+ }
+ }
+
+ int rows = in_block->rows();
+ IColumn::Selector distinct_row;
+ distinct_row.reserve(rows);
+
+ RETURN_IF_CATCH_EXCEPTION(
+ _emplace_into_hash_table_to_distinct(distinct_row, key_columns,
rows));
+
+ bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse();
+ if (mem_reuse) {
+ for (int i = 0; i < key_size; ++i) {
+ auto dst = out_block->get_by_position(i).column->assume_mutable();
+ key_columns[i]->append_data_by_selector(dst, distinct_row);
+ }
+ } else {
+ ColumnsWithTypeAndName columns_with_schema;
+ for (int i = 0; i < key_size; ++i) {
+ auto distinct_column = key_columns[i]->clone_empty();
+ key_columns[i]->append_data_by_selector(distinct_column,
distinct_row);
+ columns_with_schema.emplace_back(std::move(distinct_column),
+
_probe_expr_ctxs[i]->root()->data_type(),
+
_probe_expr_ctxs[i]->root()->expr_name());
+ }
+ out_block->swap(Block(columns_with_schema));
+ }
+ return Status::OK();
+}
+
+void
DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Selector&
distinct_row,
+
ColumnRawPtrs& key_columns,
+ const
size_t num_rows) {
+ std::visit(
+ [&](auto&& agg_method) -> void {
+ SCOPED_TIMER(_hash_table_compute_timer);
+ using HashMethodType = std::decay_t<decltype(agg_method)>;
+ using HashTableType = std::decay_t<decltype(agg_method.data)>;
+ using AggState = typename HashMethodType::State;
+ AggState state(key_columns, _probe_key_sz, nullptr);
+ _pre_serialize_key_if_need(state, agg_method, key_columns,
num_rows);
+
+ if constexpr (HashTableTraits<HashTableType>::is_phmap) {
+ if (_hash_values.size() < num_rows) {
+ _hash_values.resize(num_rows);
+ }
+ if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+ AggState>::value) {
+ for (size_t i = 0; i < num_rows; ++i) {
+ _hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
+ }
+ } else {
+ for (size_t i = 0; i < num_rows; ++i) {
+ _hash_values[i] =
+
agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
+ }
+ }
+ }
+
+ /// For all rows.
+ COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+ for (size_t i = 0; i < num_rows; ++i) {
+ auto emplace_result = [&]() {
+ if constexpr
(HashTableTraits<HashTableType>::is_phmap) {
+ if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows))
{
+ agg_method.data.prefetch_by_hash(
+ _hash_values[i +
HASH_MAP_PREFETCH_DIST]);
+ }
+ return state.emplace_key(agg_method.data,
_hash_values[i], i,
+ *_agg_arena_pool);
+ } else {
+ return state.emplace_key(agg_method.data, i,
*_agg_arena_pool);
+ }
+ }();
+
+ if (emplace_result.is_inserted()) {
+ emplace_result.set_mapped(dummy_mapped_data);
+ distinct_row.push_back(i);
+ }
+ }
+ },
+ _agg_data->_aggregated_method_variant);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/distinct_vaggregation_node.h
b/be/src/vec/exec/distinct_vaggregation_node.h
new file mode 100644
index 0000000000..f5ca0ceebb
--- /dev/null
+++ b/be/src/vec/exec/distinct_vaggregation_node.h
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <cstdint>
+#include <memory>
+
+#include "vec/exec/vaggregation_node.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris {
+class TPlanNode;
+class DescriptorTbl;
+class ObjectPool;
+class RuntimeState;
+
+namespace vectorized {
+
+// select c_name from customer union select c_name from customer
+// this sql used agg node to get distinct row of c_name,
+// so it's could output data when it's inserted into hashmap.
+// phase1: (_is_merge:false, _needs_finalize:false, Streaming
Preaggregation:true, agg size:0, limit:-1)
+// phase2: (_is_merge:false, _needs_finalize:true, Streaming
Preaggregation:false,agg size:0, limit:-1)
+class DistinctAggregationNode final : public AggregationNode {
+public:
+ DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ ~DistinctAggregationNode() override = default;
+ Status _distinct_pre_agg_with_serialized_key(Block* in_block, Block*
out_block);
+ void set_num_rows_returned(int64_t rows) { _num_rows_returned = rows; }
+ vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; }
+
+private:
+ char* dummy_mapped_data = nullptr;
+ void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
+ ColumnRawPtrs& key_columns,
const size_t num_rows);
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index d7ebfe0a51..f4da6d9aaf 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -17,6 +17,7 @@
#include "vec/exec/vaggregation_node.h"
+#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PlanNodes_types.h>
@@ -25,6 +26,7 @@
#include <array>
#include <atomic>
#include <memory>
+#include <string>
#include "exec/exec_node.h"
#include "runtime/block_spill_manager.h"
@@ -102,27 +104,27 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
+ _hash_table_compute_timer(nullptr),
+ _hash_table_input_counter(nullptr),
+ _build_timer(nullptr),
+ _expr_timer(nullptr),
+ _exec_timer(nullptr),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_output_tuple_desc(nullptr),
_needs_finalize(tnode.agg_node.need_finalize),
_is_merge(false),
- _build_timer(nullptr),
_serialize_key_timer(nullptr),
- _exec_timer(nullptr),
_merge_timer(nullptr),
- _expr_timer(nullptr),
_get_results_timer(nullptr),
_serialize_data_timer(nullptr),
_serialize_result_timer(nullptr),
_deserialize_data_timer(nullptr),
- _hash_table_compute_timer(nullptr),
_hash_table_iterate_timer(nullptr),
_insert_keys_to_column_timer(nullptr),
_streaming_agg_timer(nullptr),
_hash_table_size_counter(nullptr),
- _hash_table_input_counter(nullptr),
_max_row_size_counter(nullptr) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
@@ -454,7 +456,6 @@ Status AggregationNode::prepare_profile(RuntimeState*
state) {
}
if (_is_streaming_preagg) {
- runtime_profile()->append_exec_option("Streaming Preaggregation");
_executor.pre_agg =
std::bind<Status>(&AggregationNode::_pre_agg_with_serialized_key, this,
std::placeholders::_1,
std::placeholders::_2);
@@ -478,6 +479,14 @@ Status AggregationNode::prepare_profile(RuntimeState*
state) {
_needs_finalize; // agg's finalize step
}
+ fmt::memory_buffer msg;
+ fmt::format_to(msg,
+ "(_is_merge: {}, _needs_finalize: {}, Streaming
Preaggregation: {}, agg size: "
+ "{}, limit: {})",
+ _is_merge ? "true" : "false", _needs_finalize ? "true" :
"false",
+ _is_streaming_preagg ? "true" : "false",
+ std::to_string(_aggregate_evaluators.size()),
std::to_string(_limit));
+ runtime_profile()->add_info_string("AggInfos:", fmt::to_string(msg));
return Status::OK();
}
@@ -918,7 +927,9 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
_pre_serialize_key_if_need(state, agg_method, key_columns,
num_rows);
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
- if (_hash_values.size() < num_rows)
_hash_values.resize(num_rows);
+ if (_hash_values.size() < num_rows) {
+ _hash_values.resize(num_rows);
+ }
if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
AggState>::value) {
for (size_t i = 0; i < num_rows; ++i) {
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index e31240cdbc..7d560dae89 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -818,7 +818,7 @@ struct SpillPartitionHelper {
};
// not support spill
-class AggregationNode final : public ::doris::ExecNode {
+class AggregationNode : public ::doris::ExecNode {
public:
using Sizes = std::vector<size_t>;
@@ -836,18 +836,34 @@ public:
Status sink(doris::RuntimeState* state, vectorized::Block* input_block,
bool eos) override;
Status do_pre_agg(vectorized::Block* input_block, vectorized::Block*
output_block);
bool is_streaming_preagg() const { return _is_streaming_preagg; }
+ bool is_aggregate_evaluators_empty() const { return
_aggregate_evaluators.empty(); }
+ void _make_nullable_output_key(Block* block);
-private:
- friend class pipeline::AggSinkOperator;
- friend class pipeline::StreamingAggSinkOperator;
- friend class pipeline::AggSourceOperator;
- friend class pipeline::StreamingAggSourceOperator;
+protected:
+ bool _is_streaming_preagg;
+ bool _child_eos = false;
+ Block _preagg_block = Block();
+ ArenaUPtr _agg_arena_pool;
// group by k1,k2
VExprContextSPtrs _probe_expr_ctxs;
+ AggregatedDataVariantsUPtr _agg_data;
+
+ std::vector<size_t> _probe_key_sz;
+ std::vector<size_t> _hash_values;
// left / full join will change the key nullable make output/input solt
// nullable diff. so we need make nullable of it.
std::vector<size_t> _make_nullable_keys;
- std::vector<size_t> _probe_key_sz;
+ RuntimeProfile::Counter* _hash_table_compute_timer;
+ RuntimeProfile::Counter* _hash_table_input_counter;
+ RuntimeProfile::Counter* _build_timer;
+ RuntimeProfile::Counter* _expr_timer;
+ RuntimeProfile::Counter* _exec_timer;
+
+private:
+ friend class pipeline::AggSinkOperator;
+ friend class pipeline::StreamingAggSinkOperator;
+ friend class pipeline::AggSourceOperator;
+ friend class pipeline::StreamingAggSourceOperator;
std::vector<AggFnEvaluator*> _aggregate_evaluators;
bool _can_short_circuit = false;
@@ -873,47 +889,32 @@ private:
size_t _external_agg_bytes_threshold;
size_t _partitioned_threshold = 0;
- AggregatedDataVariantsUPtr _agg_data;
-
AggSpillContext _spill_context;
std::unique_ptr<SpillPartitionHelper> _spill_partition_helper;
- ArenaUPtr _agg_arena_pool;
-
- RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _build_table_convert_timer;
RuntimeProfile::Counter* _serialize_key_timer;
- RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _merge_timer;
- RuntimeProfile::Counter* _expr_timer;
RuntimeProfile::Counter* _get_results_timer;
RuntimeProfile::Counter* _serialize_data_timer;
RuntimeProfile::Counter* _serialize_result_timer;
RuntimeProfile::Counter* _deserialize_data_timer;
- RuntimeProfile::Counter* _hash_table_compute_timer;
RuntimeProfile::Counter* _hash_table_iterate_timer;
RuntimeProfile::Counter* _insert_keys_to_column_timer;
RuntimeProfile::Counter* _streaming_agg_timer;
RuntimeProfile::Counter* _hash_table_size_counter;
- RuntimeProfile::Counter* _hash_table_input_counter;
RuntimeProfile::Counter* _max_row_size_counter;
-
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::Counter* _hash_table_memory_usage;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage;
- bool _is_streaming_preagg;
- Block _preagg_block = Block();
bool _should_expand_hash_table = true;
- bool _child_eos = false;
-
bool _should_limit_output = false;
bool _reach_limit = false;
bool _agg_data_created_without_key = false;
PODArray<AggregateDataPtr> _places;
std::vector<char> _deserialize_buffer;
- std::vector<size_t> _hash_values;
std::vector<AggregateDataPtr> _values;
std::unique_ptr<AggregateDataContainer> _aggregate_data_container;
@@ -924,8 +925,6 @@ private:
size_t _get_hash_table_size();
- void _make_nullable_output_key(Block* block);
-
Status _create_agg_status(AggregateDataPtr data);
Status _destroy_agg_status(AggregateDataPtr data);
@@ -956,6 +955,7 @@ private:
void _close_with_serialized_key();
void _init_hash_method(const VExprContextSPtrs& probe_exprs);
+protected:
template <typename AggState, typename AggMethod>
void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
const ColumnRawPtrs& key_columns, const
size_t num_rows) {
@@ -970,6 +970,7 @@ private:
}
}
+private:
template <bool limit>
Status _execute_with_serialized_key_helper(Block* block) {
SCOPED_TIMER(_build_timer);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
index c37d66471d..d19f63f658 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
@@ -31,7 +31,6 @@ import org.apache.doris.nereids.types.DateType;
import org.apache.doris.nereids.types.DateV2Type;
import org.apache.doris.nereids.types.IntegerType;
import org.apache.doris.nereids.types.StringType;
-import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.base.Preconditions;
@@ -47,12 +46,7 @@ public class WindowFunnel extends AggregateFunction
public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(IntegerType.INSTANCE)
- .varArgs(BigIntType.INSTANCE, StringType.INSTANCE,
DateTimeType.INSTANCE, BooleanType.INSTANCE),
- FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT)
- .args(BigIntType.INSTANCE,
- StringType.INSTANCE,
- DateTimeV2Type.SYSTEM_DEFAULT,
- BooleanType.INSTANCE)
+ .varArgs(BigIntType.INSTANCE, StringType.INSTANCE,
DateTimeType.INSTANCE, BooleanType.INSTANCE)
);
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]