github-actions[bot] commented on code in PR #30875:
URL: https://github.com/apache/doris/pull/30875#discussion_r1497945606


##########
be/src/io/fs/local_file_system.cpp:
##########
@@ -453,5 +456,19 @@ Status LocalFileSystem::_glob(const std::string& pattern, 
std::vector<std::strin
     return Status::OK();
 }
 
+Status LocalFileSystem::permission(const Path& file, std::filesystem::perms 
prms) {
+    auto path = absolute_path(file);
+    FILESYSTEM_M(permission_impl(path, prms));
+}
+
+Status LocalFileSystem::permission_impl(const Path& file, 
std::filesystem::perms prms) {

Review Comment:
   warning: method 'permission_impl' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/io/fs/local_file_system.h:100:
   ```diff
   -     Status permission_impl(const Path& file, std::filesystem::perms prms);
   +     static Status permission_impl(const Path& file, std::filesystem::perms 
prms);
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -2060,7 +2065,30 @@ Status 
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
 }
 
 Status SegmentIterator::next_batch(vectorized::Block* block) {
-    auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
_next_batch_internal(block); }); }();
+    auto status = [&]() {
+        RETURN_IF_CATCH_EXCEPTION({
+            RETURN_IF_ERROR(_next_batch_internal(block));
+
+            // reverse block row order if read_orderby_key_reverse is true for 
key topn
+            // it should be processed for all success _next_batch_internal
+            if (_opts.read_orderby_key_reverse) {
+                size_t num_rows = block->rows();
+                if (num_rows == 0) {
+                    return Status::OK();
+                }
+                size_t num_columns = block->columns();
+                vectorized::IColumn::Permutation permutation;
+                for (size_t i = 0; i < num_rows; ++i) 
permutation.emplace_back(num_rows - 1 - i);

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
                   for (size_t i = 0; i < num_rows; ++i) { 
permutation.emplace_back(num_rows - 1 - i);
   }
   ```
   



##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* 
state,
+                                                               OperatorXBase* 
parent)
+        : PipelineXLocalState<FakeDependency>(state, parent),
+          dummy_mapped_data(std::make_shared<char>('A')),
+          _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+          _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+          _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+          _child_block(vectorized::Block::create_unique()),
+          _child_source_state(SourceState::DEPEND_ON_SOURCE),
+          _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+    for (auto& evaluator : p._aggregate_evaluators) {
+        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+    }
+    _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+    for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
+    }
+
+    _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+    _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
+    _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
+    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
+
+    if (_probe_expr_ctxs.empty()) {
+        _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
+                _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+    } else {
+        _init_hash_method(_probe_expr_ctxs);
+    }
+    return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(

Review Comment:
   warning: method '_init_hash_method' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void DistinctStreamingAggLocalState::_init_hash_method(
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -34,25 +34,24 @@ class VRuntimeFilterSlots {
 public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
-            const std::vector<TRuntimeFilterDesc>& runtime_filter_descs, bool 
is_global = false)
+            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
-              _runtime_filter_descs(runtime_filter_descs),
-              _is_global(is_global) {}
+              _runtime_filters(runtime_filters),
+              _need_local_merge(need_local_merge) {}
 
     Status init(RuntimeState* state, int64_t hash_table_size) {
         // runtime filter effect strategy
         // 1. we will ignore IN filter when hash_table_size is too big
         // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
         // is too small and IN filter has effect
-
         std::map<int, bool> has_in_filter;
 
         auto ignore_local_filter = [&](int filter_id) {
             // Now pipeline x have bug in ignore, after fix the problem enable 
ignore logic in pipeline x
-            if (_is_global) {
+            if (_need_local_merge) {
                 return Status::OK();
             }
-            auto runtime_filter_mgr = state->runtime_filter_mgr();
+            auto runtime_filter_mgr = state->local_runtime_filter_mgr();

Review Comment:
   warning: 'auto runtime_filter_mgr' can be declared as 'auto 
*runtime_filter_mgr' [readability-qualified-auto]
   
   ```suggestion
               auto *runtime_filter_mgr = state->local_runtime_filter_mgr();
   ```
   



##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* 
state,
+                                                               OperatorXBase* 
parent)
+        : PipelineXLocalState<FakeDependency>(state, parent),
+          dummy_mapped_data(std::make_shared<char>('A')),
+          _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+          _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+          _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+          _child_block(vectorized::Block::create_unique()),
+          _child_source_state(SourceState::DEPEND_ON_SOURCE),
+          _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {

Review Comment:
   warning: method 'init' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/distinct_streaming_aggregation_operator.h:45:
   ```diff
   -     Status init(RuntimeState* state, LocalStateInfo& info) override;
   +     static Status init(RuntimeState* state, LocalStateInfo& info) override;
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -2060,7 +2065,30 @@
 }
 
 Status SegmentIterator::next_batch(vectorized::Block* block) {
-    auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
_next_batch_internal(block); }); }();
+    auto status = [&]() {
+        RETURN_IF_CATCH_EXCEPTION({
+            RETURN_IF_ERROR(_next_batch_internal(block));
+
+            // reverse block row order if read_orderby_key_reverse is true for 
key topn
+            // it should be processed for all success _next_batch_internal
+            if (_opts.read_orderby_key_reverse) {
+                size_t num_rows = block->rows();
+                if (num_rows == 0) {
+                    return Status::OK();
+                }
+                size_t num_columns = block->columns();
+                vectorized::IColumn::Permutation permutation;
+                for (size_t i = 0; i < num_rows; ++i) 
permutation.emplace_back(num_rows - 1 - i);
+
+                for (size_t i = 0; i < num_columns; ++i)

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
                   for (size_t i = 0; i < num_columns; ++i) {
   ```
   
   be/src/olap/rowset/segment_v2/segment_iterator.cpp:2084:
   ```diff
   -                             
block->get_by_position(i).column->permute(permutation, num_rows);
   +                             
block->get_by_position(i).column->permute(permutation, num_rows);
   + }
   ```
   



##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -830,8 +777,7 @@
     return Status::OK();
 }
 
-template <typename LocalStateType>
-Status AggSinkOperatorX<LocalStateType>::open(RuntimeState* state) {
+Status AggSinkOperatorX::open(RuntimeState* state) {

Review Comment:
   warning: method 'open' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/aggregation_sink_operator.h:342:
   ```diff
   -     Status open(RuntimeState* state) override;
   +     static Status open(RuntimeState* state) override;
   ```
   



##########
be/src/olap/wal/wal_dirs_info.cpp:
##########
@@ -205,18 +207,18 @@
     return Status::OK();
 }
 
-Status WalDirsInfo::update_wal_dir_pre_allocated(const std::string& wal_dir,
-                                                 size_t increase_pre_allocated,
-                                                 size_t 
decrease_pre_allocated) {
+Status WalDirsInfo::update_wal_dir_estimated_wal_bytes(const std::string& 
wal_dir,

Review Comment:
   warning: method 'update_wal_dir_estimated_wal_bytes' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal/wal_dirs_info.h:76:
   ```diff
   -     Status update_wal_dir_estimated_wal_bytes(const std::string& wal_dir,
   +     static Status update_wal_dir_estimated_wal_bytes(const std::string& 
wal_dir,
   ```
   



##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -232,9 +233,71 @@ Status WalManager::get_wal_path(int64_t wal_id, 
std::string& wal_path) {
     return Status::OK();
 }
 
-Status WalManager::_scan_wals(const std::string& wal_path) {
-    size_t count = 0;
-    bool exists = true;
+Status WalManager::parse_wal_path(const std::string& file_name, int64_t& 
version,

Review Comment:
   warning: method 'parse_wal_path' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalManager::parse_wal_path(const std::string& file_name, 
int64_t& version,
   ```
   



##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -232,9 +233,71 @@
     return Status::OK();
 }
 
-Status WalManager::_scan_wals(const std::string& wal_path) {
-    size_t count = 0;
-    bool exists = true;
+Status WalManager::parse_wal_path(const std::string& file_name, int64_t& 
version,
+                                  int64_t& backend_id, int64_t& wal_id, 
std::string& label) {
+    try {
+        // find version
+        auto pos = file_name.find("_");
+        version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
+        // find be id
+        auto substring1 = file_name.substr(pos + 1);
+        pos = substring1.find("_");
+        backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10);
+        // find wal id
+        auto substring2 = substring1.substr(pos + 1);
+        pos = substring2.find("_");
+        wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10);
+        // find label
+        label = substring2.substr(pos + 1);
+        VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id << 
",wal_id:" << wal_id
+                   << ",label:" << label;
+    } catch (const std::invalid_argument& e) {
+        return Status::InvalidArgument("Invalid format, {}", e.what());
+    }
+    return Status::OK();
+}
+
+Status WalManager::_load_wals() {

Review Comment:
   warning: method '_load_wals' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal/wal_manager.h:115:
   ```diff
   -     Status _load_wals();
   +     static Status _load_wals();
   ```
   



##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -939,70 +938,69 @@ class RuntimePredicateWrapper {
 Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
                               const TRuntimeFilterDesc* desc, const 
TQueryOptions* query_options,
                               const RuntimeFilterRole role, int node_id, 
IRuntimeFilter** res,
-                              bool build_bf_exactly, bool is_global, int 
parallel_tasks) {
-    *res = pool->add(new IRuntimeFilter(state, pool, desc, is_global, 
parallel_tasks));
+                              bool build_bf_exactly, bool need_local_merge) {
+    *res = pool->add(new IRuntimeFilter(state, pool, desc, need_local_merge));
     (*res)->set_role(role);
     return (*res)->init_with_desc(desc, query_options, node_id,
-                                  is_global ? false : build_bf_exactly);
+                                  need_local_merge ? false : build_bf_exactly);
 }
 
 vectorized::SharedRuntimeFilterContext& 
IRuntimeFilter::get_shared_context_ref() {
     return _wrapper->_context;
 }
 
-void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) {
-    _wrapper->_filter_type = other->_wrapper->_filter_type;
-    _wrapper->_is_bloomfilter = other->is_bloomfilter();
-    _wrapper->_context = other->_wrapper->_context;
-}
-
 void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t 
start) {
     DCHECK(is_producer());
     _wrapper->insert_batch(column, start);
 }
 
-Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, 
int* merged_num) {
-    SCOPED_TIMER(_merge_local_rf_timer);
-    std::unique_lock lock(_local_merge_mutex);
-    if (_merged_rf_num == 0) {
-        _wrapper = wrapper;
-    } else {
-        RETURN_IF_ERROR(merge_from(wrapper));
-    }
-    *merged_num = ++_merged_rf_num;
-    return Status::OK();
-}
-
 Status IRuntimeFilter::publish(bool publish_local) {

Review Comment:
   warning: function 'publish' has cognitive complexity of 77 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status IRuntimeFilter::publish(bool publish_local) {
                          ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/exprs/runtime_filter.cpp:958:** nesting level increased to 1
   ```cpp
       auto send_to_remote = [&](IRuntimeFilter* filter) {
                             ^
   ```
   **be/src/exprs/runtime_filter.cpp:961:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:961:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:964:** nesting level increased to 1
   ```cpp
       auto send_to_local = [&](RuntimePredicateWrapper* wrapper) {
                            ^
   ```
   **be/src/exprs/runtime_filter.cpp:966:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, 
filters));
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:966:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, 
filters));
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:976:** nesting level increased to 1
   ```cpp
       auto do_local_merge = [&]() {
                             ^
   ```
   **be/src/exprs/runtime_filter.cpp:978:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:978:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:981:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:981:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:983:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           if (local_merge_filters->merge_time == 0) {
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:984:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
               if (_has_local_target) {
               ^
   ```
   **be/src/exprs/runtime_filter.cpp:985:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
                   
RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
                   ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:985:** +5, including nesting penalty of 4, 
nesting level increased to 5
   ```cpp
                   
RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
                   ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:986:** +1, nesting level increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/exprs/runtime_filter.cpp:987:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
                   
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
                   ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:987:** +5, including nesting penalty of 4, 
nesting level increased to 5
   ```cpp
                   
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
                   ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:993:** +1, including nesting penalty of 0, 
nesting level increased to 1
   ```cpp
       if (_need_local_merge && _has_local_target) {
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:993:** +1
   ```cpp
       if (_need_local_merge && _has_local_target) {
                             ^
   ```
   **be/src/exprs/runtime_filter.cpp:994:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(do_local_merge());
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:994:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(do_local_merge());
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:995:** +1, nesting level increased to 1
   ```cpp
       } else if (_has_local_target) {
              ^
   ```
   **be/src/exprs/runtime_filter.cpp:996:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(send_to_local(_wrapper));
           ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:996:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(send_to_local(_wrapper));
           ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:997:** +1, nesting level increased to 1
   ```cpp
       } else if (!publish_local) {
              ^
   ```
   **be/src/exprs/runtime_filter.cpp:998:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
           if (_is_broadcast_join || _state->be_exec_version < 3) {
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:998:** +1
   ```cpp
           if (_is_broadcast_join || _state->be_exec_version < 3) {
                                  ^
   ```
   **be/src/exprs/runtime_filter.cpp:999:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(send_to_remote(this));
               ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:999:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(send_to_remote(this));
               ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:1000:** +1, nesting level increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/exprs/runtime_filter.cpp:1001:** +3, including nesting penalty of 
2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(do_local_merge());
               ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:1001:** +4, including nesting penalty of 
3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(do_local_merge());
               ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:1003:** +1, nesting level increased to 1
   ```cpp
       } else {
         ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -48,30 +48,25 @@ OPERATOR_CODE_GENERATOR(AggSinkOperator, StreamingOperator)
 /// using the planner's estimated input cardinality and the assumption that 
input
 /// is in a random order. This means that we assume that the reduction factor 
will
 /// increase over time.
-template <typename DependencyType, typename Derived>
-AggSinkLocalState<DependencyType, 
Derived>::AggSinkLocalState(DataSinkOperatorXBase* parent,
-                                                              RuntimeState* 
state)
+AggSinkLocalState::AggSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
         : Base(parent, state),
           _hash_table_compute_timer(nullptr),
           _hash_table_input_counter(nullptr),
           _build_timer(nullptr),
           _expr_timer(nullptr),
-          _exec_timer(nullptr),
           _serialize_key_timer(nullptr),
           _merge_timer(nullptr),
           _serialize_data_timer(nullptr),
           _deserialize_data_timer(nullptr),
           _max_row_size_counter(nullptr) {}
 
-template <typename DependencyType, typename Derived>
-Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
-                                                        LocalSinkStateInfo& 
info) {
+Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {

Review Comment:
   warning: function 'init' exceeds recommended size/complexity thresholds 
[readability-function-size]
   ```cpp
   Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/aggregation_sink_operator.cpp:62:** 83 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
                             ^
   ```
   
   </details>
   



##########
be/src/olap/lru_cache.h:
##########
@@ -479,8 +486,10 @@ class DummyLRUCache : public Cache {
     void* value(Handle* handle) override;
     Slice value_slice(Handle* handle) override;
     uint64_t new_id() override { return 0; };
-    int64_t prune() override { return 0; };
-    int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) 
override { return 0; };
+    PrunedInfo prune() override { return {0, 0}; };
+    PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false) 
override {

Review Comment:
   warning: method 'prune_if' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = 
false) override {
   ```
   



##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -260,9 +235,7 @@
             _agg_data->method_variant);
 }
 
-template <typename DependencyType, typename Derived>
-Status AggSinkLocalState<DependencyType, Derived>::_destroy_agg_status(
-        vectorized::AggregateDataPtr data) {
+Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr 
data) {

Review Comment:
   warning: method '_destroy_agg_status' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/aggregation_sink_operator.h:205:
   ```diff
   -     Status _destroy_agg_status(vectorized::AggregateDataPtr data);
   +     static Status _destroy_agg_status(vectorized::AggregateDataPtr data);
   ```
   



##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -232,9 +233,71 @@
     return Status::OK();
 }
 
-Status WalManager::_scan_wals(const std::string& wal_path) {
-    size_t count = 0;
-    bool exists = true;
+Status WalManager::parse_wal_path(const std::string& file_name, int64_t& 
version,
+                                  int64_t& backend_id, int64_t& wal_id, 
std::string& label) {
+    try {
+        // find version
+        auto pos = file_name.find("_");
+        version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
+        // find be id
+        auto substring1 = file_name.substr(pos + 1);
+        pos = substring1.find("_");
+        backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10);
+        // find wal id
+        auto substring2 = substring1.substr(pos + 1);
+        pos = substring2.find("_");
+        wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10);
+        // find label
+        label = substring2.substr(pos + 1);
+        VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id << 
",wal_id:" << wal_id
+                   << ",label:" << label;
+    } catch (const std::invalid_argument& e) {
+        return Status::InvalidArgument("Invalid format, {}", e.what());
+    }
+    return Status::OK();
+}
+
+Status WalManager::_load_wals() {
+    std::vector<ScanWalInfo> wals;
+    for (auto wal_dir : _wal_dirs) {
+        WARN_IF_ERROR(_scan_wals(wal_dir, wals), fmt::format("fail to scan wal 
dir={}", wal_dir));
+    }
+    for (const auto& wal : wals) {
+        bool exists = false;
+        WARN_IF_ERROR(io::global_local_filesystem()->exists(wal.wal_path, 
&exists),
+                      fmt::format("fail to check exist on wal file={}", 
wal.wal_path));
+        if (!exists) {
+            continue;
+        }
+        LOG(INFO) << "find wal: " << wal.wal_path;
+        {
+            std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
+            auto it = _wal_path_map.find(wal.wal_id);
+            if (it != _wal_path_map.end()) {
+                LOG(INFO) << "wal_id " << wal.wal_id << " already in 
wal_path_map, skip it";
+                continue;
+            }
+            _wal_path_map.emplace(wal.wal_id, wal.wal_path);
+        }
+        // this config is use for test p0 case in pipeline
+        if (config::group_commit_wait_replay_wal_finish) {
+            auto lock = std::make_shared<std::mutex>();
+            auto cv = std::make_shared<std::condition_variable>();
+            auto add_st = add_wal_cv_map(wal.wal_id, lock, cv);
+            if (!add_st.ok()) {
+                LOG(WARNING) << "fail to add wal_id " << wal.wal_id << " to 
wal_cv_map";
+                continue;
+            }
+        }
+        _exec_env->wal_mgr()->add_wal_queue(wal.tb_id, wal.wal_id);
+        WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, 
wal.wal_path),
+                      fmt::format("Failed to add recover wal={}", 
wal.wal_path));
+    }
+    return Status::OK();
+}
+
+Status WalManager::_scan_wals(const std::string& wal_path, 
std::vector<ScanWalInfo>& res) {

Review Comment:
   warning: method '_scan_wals' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalManager::_scan_wals(const std::string& wal_path, 
std::vector<ScanWalInfo>& res) {
   ```
   



##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -178,9 +160,7 @@
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-Status AggSinkLocalState<DependencyType, Derived>::_create_agg_status(
-        vectorized::AggregateDataPtr data) {
+Status AggSinkLocalState::_create_agg_status(vectorized::AggregateDataPtr 
data) {

Review Comment:
   warning: method '_create_agg_status' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/aggregation_sink_operator.h:293:
   ```diff
   -     Status _create_agg_status(vectorized::AggregateDataPtr data);
   +     static Status _create_agg_status(vectorized::AggregateDataPtr data);
   ```
   



##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -392,18 +363,15 @@
 // We should call this function only at 1st phase.
 // 1st phase: is_merge=true, only have one SlotRef.
 // 2nd phase: is_merge=false, maybe have multiple exprs.
-template <typename DependencyType, typename Derived>
-int AggSinkLocalState<DependencyType, Derived>::_get_slot_column_id(
-        const vectorized::AggFnEvaluator* evaluator) {
+int AggSinkLocalState::_get_slot_column_id(const vectorized::AggFnEvaluator* 
evaluator) {

Review Comment:
   warning: method '_get_slot_column_id' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/aggregation_sink_operator.h:298:
   ```diff
   -     int _get_slot_column_id(const vectorized::AggFnEvaluator* evaluator);
   +     static int _get_slot_column_id(const vectorized::AggFnEvaluator* 
evaluator);
   ```
   



##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -842,18 +788,16 @@
     return Status::OK();
 }
 
-template <typename LocalStateType>
-Status AggSinkOperatorX<LocalStateType>::sink(doris::RuntimeState* state,
-                                              vectorized::Block* in_block,
-                                              SourceState source_state) {
+Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* 
in_block,

Review Comment:
   warning: method 'sink' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/aggregation_sink_operator.h:344:
   ```diff
   -     Status sink(RuntimeState* state, vectorized::Block* in_block,
   +     static Status sink(RuntimeState* state, vectorized::Block* in_block,
   ```
   



##########
be/src/olap/wal/wal_dirs_info.cpp:
##########
@@ -50,20 +50,21 @@ void WalDirInfo::set_used(size_t used) {
     _used = used;
 }
 
-size_t WalDirInfo::get_pre_allocated() {
+size_t WalDirInfo::get_estimated_wal_bytes() {

Review Comment:
   warning: method 'get_estimated_wal_bytes' can be made const 
[readability-make-member-function-const]
   
   be/src/olap/wal/wal_dirs_info.h:43:
   ```diff
   -     size_t get_estimated_wal_bytes();
   +     size_t get_estimated_wal_bytes() const;
   ```
   
   ```suggestion
   size_t WalDirInfo::get_estimated_wal_bytes() const {
   ```
   



##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -271,10 +244,8 @@
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
 template <bool limit, bool for_spill>
-Status AggSinkLocalState<DependencyType, 
Derived>::_merge_with_serialized_key_helper(
-        vectorized::Block* block) {
+Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* 
block) {

Review Comment:
   warning: function '_merge_with_serialized_key_helper' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* block) {
                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/aggregation_sink_operator.cpp:247:** 113 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* block) {
                             ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* 
state,
+                                                               OperatorXBase* 
parent)
+        : PipelineXLocalState<FakeDependency>(state, parent),
+          dummy_mapped_data(std::make_shared<char>('A')),
+          _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+          _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+          _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+          _child_block(vectorized::Block::create_unique()),
+          _child_source_state(SourceState::DEPEND_ON_SOURCE),
+          _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+    for (auto& evaluator : p._aggregate_evaluators) {
+        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+    }
+    _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+    for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
+    }
+
+    _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+    _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
+    _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
+    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
+
+    if (_probe_expr_ctxs.empty()) {
+        _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
+                _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+    } else {
+        _init_hash_method(_probe_expr_ctxs);
+    }
+    return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
+        const vectorized::VExprContextSPtrs& probe_exprs) {
+    DCHECK(probe_exprs.size() >= 1);
+
+    using Type = vectorized::AggregatedDataVariants::Type;
+    Type t(Type::serialized);
+
+    if (probe_exprs.size() == 1) {
+        auto is_nullable = probe_exprs[0]->root()->is_nullable();
+        PrimitiveType type = probe_exprs[0]->root()->result_type();
+        switch (type) {
+        case TYPE_TINYINT:
+        case TYPE_BOOLEAN:
+        case TYPE_SMALLINT:
+        case TYPE_INT:
+        case TYPE_FLOAT:
+        case TYPE_DATEV2:
+        case TYPE_BIGINT:
+        case TYPE_DOUBLE:
+        case TYPE_DATE:
+        case TYPE_DATETIME:
+        case TYPE_DATETIMEV2:
+        case TYPE_LARGEINT:
+        case TYPE_DECIMALV2:
+        case TYPE_DECIMAL32:
+        case TYPE_DECIMAL64:
+        case TYPE_DECIMAL128I: {
+            size_t size = get_primitive_type_size(type);
+            if (size == 1) {
+                t = Type::int8_key;
+            } else if (size == 2) {
+                t = Type::int16_key;
+            } else if (size == 4) {
+                t = Type::int32_key;
+            } else if (size == 8) {
+                t = Type::int64_key;
+            } else if (size == 16) {
+                t = Type::int128_key;
+            } else {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "meet invalid type size, size={}, type={}", 
size,
+                                type_to_string(type));
+            }
+            break;
+        }
+        case TYPE_CHAR:
+        case TYPE_VARCHAR:
+        case TYPE_STRING: {
+            t = Type::string_key;
+            break;
+        }
+        default:
+            t = Type::serialized;
+        }
+
+        _agg_data->init(get_hash_key_type_with_phase(
+                                t, !Base::_parent->template 
cast<DistinctStreamingAggOperatorX>()
+                                            ._is_first_phase),
+                        is_nullable);
+    } else {
+        if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+                                            
vectorized::AggregateDataPtr>(_agg_data->method_variant,
+                                                                          
probe_exprs)) {
+            _agg_data->init(Type::serialized);
+        }
+    }
+}
+
+Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
+        doris::vectorized::Block* in_block, doris::vectorized::Block* 
out_block) {
+    SCOPED_TIMER(_build_timer);
+    DCHECK(!_probe_expr_ctxs.empty());
+
+    size_t key_size = _probe_expr_ctxs.size();
+    vectorized::ColumnRawPtrs key_columns(key_size);
+    {
+        SCOPED_TIMER(_expr_timer);
+        for (size_t i = 0; i < key_size; ++i) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, 
&result_column_id));
+            in_block->get_by_position(result_column_id).column =
+                    in_block->get_by_position(result_column_id)
+                            .column->convert_to_full_column_if_const();
+            key_columns[i] = 
in_block->get_by_position(result_column_id).column.get();
+        }
+    }
+
+    int rows = in_block->rows();
+    _distinct_row.clear();
+    _distinct_row.reserve(rows);
+
+    RETURN_IF_CATCH_EXCEPTION(
+            _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, 
rows));
+
+    bool mem_reuse = 
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
+                     out_block->mem_reuse();
+    if (mem_reuse) {
+        for (int i = 0; i < key_size; ++i) {
+            auto dst = out_block->get_by_position(i).column->assume_mutable();
+            key_columns[i]->append_data_by_selector(dst, _distinct_row);
+        }
+    } else {
+        vectorized::ColumnsWithTypeAndName columns_with_schema;
+        for (int i = 0; i < key_size; ++i) {
+            auto distinct_column = key_columns[i]->clone_empty();
+            key_columns[i]->append_data_by_selector(distinct_column, 
_distinct_row);
+            columns_with_schema.emplace_back(std::move(distinct_column),
+                                             
_probe_expr_ctxs[i]->root()->data_type(),
+                                             
_probe_expr_ctxs[i]->root()->expr_name());
+        }
+        out_block->swap(vectorized::Block(columns_with_schema));
+    }
+    return Status::OK();
+}
+
+void 
DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block* 
block) {
+    if (block->rows() != 0) {
+        for (auto cid : 
Base::_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys) {
+            block->get_by_position(cid).column = 
make_nullable(block->get_by_position(cid).column);
+            block->get_by_position(cid).type = 
make_nullable(block->get_by_position(cid).type);
+        }
+    }
+}
+
+void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
+        vectorized::IColumn::Selector& distinct_row, 
vectorized::ColumnRawPtrs& key_columns,
+        const size_t num_rows) {
+    std::visit(
+            [&](auto&& agg_method) -> void {
+                SCOPED_TIMER(_hash_table_compute_timer);
+                using HashMethodType = std::decay_t<decltype(agg_method)>;
+                using AggState = typename HashMethodType::State;
+                AggState state(key_columns);
+                agg_method.init_serialized_keys(key_columns, num_rows);
+                size_t row = 0;
+                auto creator = [&](const auto& ctor, auto& key, auto& origin) {
+                    HashMethodType::try_presis_key(key, origin, _arena);
+                    ctor(key, dummy_mapped_data.get());
+                    distinct_row.push_back(row);
+                };
+                auto creator_for_null_key = [&](auto& mapped) {
+                    mapped = dummy_mapped_data.get();
+                    distinct_row.push_back(row);
+                };
+
+                SCOPED_TIMER(_hash_table_emplace_timer);
+                for (; row < num_rows; ++row) {
+                    agg_method.lazy_emplace(state, row, creator, 
creator_for_null_key);
+                }
+
+                COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+            },
+            _agg_data->method_variant);
+}
+
+DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, 
int operator_id,
+                                                             const TPlanNode& 
tnode,
+                                                             const 
DescriptorTbl& descs)
+        : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, 
operator_id, descs),
+          _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
+          _intermediate_tuple_desc(nullptr),

Review Comment:
   warning: member initializer for '_intermediate_tuple_desc' is redundant 
[modernize-use-default-member-init]
   
   ```suggestion
             ,
   ```
   



##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -865,8 +809,7 @@
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state, 
Status exec_status) {
+Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {

Review Comment:
   warning: method 'close' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/aggregation_sink_operator.h:66:
   ```diff
   -     Status close(RuntimeState* state, Status exec_status) override;
   +     static Status close(RuntimeState* state, Status exec_status) override;
   ```
   



##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* 
state,
+                                                               OperatorXBase* 
parent)
+        : PipelineXLocalState<FakeDependency>(state, parent),
+          dummy_mapped_data(std::make_shared<char>('A')),
+          _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+          _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+          _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+          _child_block(vectorized::Block::create_unique()),
+          _child_source_state(SourceState::DEPEND_ON_SOURCE),
+          _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+    for (auto& evaluator : p._aggregate_evaluators) {
+        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+    }
+    _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+    for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
+    }
+
+    _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+    _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
+    _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
+    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
+
+    if (_probe_expr_ctxs.empty()) {
+        _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
+                _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+    } else {
+        _init_hash_method(_probe_expr_ctxs);
+    }
+    return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
+        const vectorized::VExprContextSPtrs& probe_exprs) {
+    DCHECK(probe_exprs.size() >= 1);
+
+    using Type = vectorized::AggregatedDataVariants::Type;
+    Type t(Type::serialized);
+
+    if (probe_exprs.size() == 1) {
+        auto is_nullable = probe_exprs[0]->root()->is_nullable();
+        PrimitiveType type = probe_exprs[0]->root()->result_type();
+        switch (type) {
+        case TYPE_TINYINT:
+        case TYPE_BOOLEAN:
+        case TYPE_SMALLINT:
+        case TYPE_INT:
+        case TYPE_FLOAT:
+        case TYPE_DATEV2:
+        case TYPE_BIGINT:
+        case TYPE_DOUBLE:
+        case TYPE_DATE:
+        case TYPE_DATETIME:
+        case TYPE_DATETIMEV2:
+        case TYPE_LARGEINT:
+        case TYPE_DECIMALV2:
+        case TYPE_DECIMAL32:
+        case TYPE_DECIMAL64:
+        case TYPE_DECIMAL128I: {
+            size_t size = get_primitive_type_size(type);
+            if (size == 1) {
+                t = Type::int8_key;
+            } else if (size == 2) {
+                t = Type::int16_key;
+            } else if (size == 4) {
+                t = Type::int32_key;
+            } else if (size == 8) {
+                t = Type::int64_key;
+            } else if (size == 16) {
+                t = Type::int128_key;
+            } else {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "meet invalid type size, size={}, type={}", 
size,
+                                type_to_string(type));
+            }
+            break;
+        }
+        case TYPE_CHAR:
+        case TYPE_VARCHAR:
+        case TYPE_STRING: {
+            t = Type::string_key;
+            break;
+        }
+        default:
+            t = Type::serialized;
+        }
+
+        _agg_data->init(get_hash_key_type_with_phase(
+                                t, !Base::_parent->template 
cast<DistinctStreamingAggOperatorX>()
+                                            ._is_first_phase),
+                        is_nullable);
+    } else {
+        if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+                                            
vectorized::AggregateDataPtr>(_agg_data->method_variant,
+                                                                          
probe_exprs)) {
+            _agg_data->init(Type::serialized);
+        }
+    }
+}
+
+Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
+        doris::vectorized::Block* in_block, doris::vectorized::Block* 
out_block) {
+    SCOPED_TIMER(_build_timer);
+    DCHECK(!_probe_expr_ctxs.empty());
+
+    size_t key_size = _probe_expr_ctxs.size();
+    vectorized::ColumnRawPtrs key_columns(key_size);
+    {
+        SCOPED_TIMER(_expr_timer);
+        for (size_t i = 0; i < key_size; ++i) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, 
&result_column_id));
+            in_block->get_by_position(result_column_id).column =
+                    in_block->get_by_position(result_column_id)
+                            .column->convert_to_full_column_if_const();
+            key_columns[i] = 
in_block->get_by_position(result_column_id).column.get();
+        }
+    }
+
+    int rows = in_block->rows();
+    _distinct_row.clear();
+    _distinct_row.reserve(rows);
+
+    RETURN_IF_CATCH_EXCEPTION(
+            _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, 
rows));
+
+    bool mem_reuse = 
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
+                     out_block->mem_reuse();
+    if (mem_reuse) {
+        for (int i = 0; i < key_size; ++i) {
+            auto dst = out_block->get_by_position(i).column->assume_mutable();
+            key_columns[i]->append_data_by_selector(dst, _distinct_row);
+        }
+    } else {
+        vectorized::ColumnsWithTypeAndName columns_with_schema;
+        for (int i = 0; i < key_size; ++i) {
+            auto distinct_column = key_columns[i]->clone_empty();
+            key_columns[i]->append_data_by_selector(distinct_column, 
_distinct_row);
+            columns_with_schema.emplace_back(std::move(distinct_column),
+                                             
_probe_expr_ctxs[i]->root()->data_type(),
+                                             
_probe_expr_ctxs[i]->root()->expr_name());
+        }
+        out_block->swap(vectorized::Block(columns_with_schema));
+    }
+    return Status::OK();
+}
+
+void 
DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block* 
block) {
+    if (block->rows() != 0) {
+        for (auto cid : 
Base::_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys) {
+            block->get_by_position(cid).column = 
make_nullable(block->get_by_position(cid).column);
+            block->get_by_position(cid).type = 
make_nullable(block->get_by_position(cid).type);
+        }
+    }
+}
+
+void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
+        vectorized::IColumn::Selector& distinct_row, 
vectorized::ColumnRawPtrs& key_columns,
+        const size_t num_rows) {
+    std::visit(
+            [&](auto&& agg_method) -> void {
+                SCOPED_TIMER(_hash_table_compute_timer);
+                using HashMethodType = std::decay_t<decltype(agg_method)>;
+                using AggState = typename HashMethodType::State;
+                AggState state(key_columns);
+                agg_method.init_serialized_keys(key_columns, num_rows);
+                size_t row = 0;
+                auto creator = [&](const auto& ctor, auto& key, auto& origin) {
+                    HashMethodType::try_presis_key(key, origin, _arena);
+                    ctor(key, dummy_mapped_data.get());
+                    distinct_row.push_back(row);
+                };
+                auto creator_for_null_key = [&](auto& mapped) {
+                    mapped = dummy_mapped_data.get();
+                    distinct_row.push_back(row);
+                };
+
+                SCOPED_TIMER(_hash_table_emplace_timer);
+                for (; row < num_rows; ++row) {
+                    agg_method.lazy_emplace(state, row, creator, 
creator_for_null_key);
+                }
+
+                COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+            },
+            _agg_data->method_variant);
+}
+
+DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, 
int operator_id,
+                                                             const TPlanNode& 
tnode,
+                                                             const 
DescriptorTbl& descs)
+        : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, 
operator_id, descs),
+          _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
+          _intermediate_tuple_desc(nullptr),
+          _output_tuple_id(tnode.agg_node.output_tuple_id),
+          _output_tuple_desc(nullptr),

Review Comment:
   warning: member initializer for '_output_tuple_desc' is redundant 
[modernize-use-default-member-init]
   
   ```suggestion
             ,
   ```
   



##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* 
state,
+                                                               OperatorXBase* 
parent)
+        : PipelineXLocalState<FakeDependency>(state, parent),
+          dummy_mapped_data(std::make_shared<char>('A')),
+          _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+          _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+          _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+          _child_block(vectorized::Block::create_unique()),
+          _child_source_state(SourceState::DEPEND_ON_SOURCE),
+          _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+    for (auto& evaluator : p._aggregate_evaluators) {
+        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+    }
+    _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+    for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
+    }
+
+    _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+    _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
+    _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
+    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
+
+    if (_probe_expr_ctxs.empty()) {
+        _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
+                _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+    } else {
+        _init_hash_method(_probe_expr_ctxs);
+    }
+    return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
+        const vectorized::VExprContextSPtrs& probe_exprs) {
+    DCHECK(probe_exprs.size() >= 1);
+
+    using Type = vectorized::AggregatedDataVariants::Type;
+    Type t(Type::serialized);
+
+    if (probe_exprs.size() == 1) {
+        auto is_nullable = probe_exprs[0]->root()->is_nullable();
+        PrimitiveType type = probe_exprs[0]->root()->result_type();
+        switch (type) {
+        case TYPE_TINYINT:
+        case TYPE_BOOLEAN:
+        case TYPE_SMALLINT:
+        case TYPE_INT:
+        case TYPE_FLOAT:
+        case TYPE_DATEV2:
+        case TYPE_BIGINT:
+        case TYPE_DOUBLE:
+        case TYPE_DATE:
+        case TYPE_DATETIME:
+        case TYPE_DATETIMEV2:
+        case TYPE_LARGEINT:
+        case TYPE_DECIMALV2:
+        case TYPE_DECIMAL32:
+        case TYPE_DECIMAL64:
+        case TYPE_DECIMAL128I: {
+            size_t size = get_primitive_type_size(type);
+            if (size == 1) {
+                t = Type::int8_key;
+            } else if (size == 2) {
+                t = Type::int16_key;
+            } else if (size == 4) {
+                t = Type::int32_key;
+            } else if (size == 8) {
+                t = Type::int64_key;
+            } else if (size == 16) {
+                t = Type::int128_key;
+            } else {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "meet invalid type size, size={}, type={}", 
size,
+                                type_to_string(type));
+            }
+            break;
+        }
+        case TYPE_CHAR:
+        case TYPE_VARCHAR:
+        case TYPE_STRING: {
+            t = Type::string_key;
+            break;
+        }
+        default:
+            t = Type::serialized;
+        }
+
+        _agg_data->init(get_hash_key_type_with_phase(
+                                t, !Base::_parent->template 
cast<DistinctStreamingAggOperatorX>()
+                                            ._is_first_phase),
+                        is_nullable);
+    } else {
+        if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+                                            
vectorized::AggregateDataPtr>(_agg_data->method_variant,
+                                                                          
probe_exprs)) {
+            _agg_data->init(Type::serialized);
+        }
+    }
+}
+
+Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
+        doris::vectorized::Block* in_block, doris::vectorized::Block* 
out_block) {
+    SCOPED_TIMER(_build_timer);
+    DCHECK(!_probe_expr_ctxs.empty());
+
+    size_t key_size = _probe_expr_ctxs.size();
+    vectorized::ColumnRawPtrs key_columns(key_size);
+    {
+        SCOPED_TIMER(_expr_timer);
+        for (size_t i = 0; i < key_size; ++i) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, 
&result_column_id));
+            in_block->get_by_position(result_column_id).column =
+                    in_block->get_by_position(result_column_id)
+                            .column->convert_to_full_column_if_const();
+            key_columns[i] = 
in_block->get_by_position(result_column_id).column.get();
+        }
+    }
+
+    int rows = in_block->rows();
+    _distinct_row.clear();
+    _distinct_row.reserve(rows);
+
+    RETURN_IF_CATCH_EXCEPTION(
+            _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, 
rows));
+
+    bool mem_reuse = 
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
+                     out_block->mem_reuse();
+    if (mem_reuse) {
+        for (int i = 0; i < key_size; ++i) {
+            auto dst = out_block->get_by_position(i).column->assume_mutable();
+            key_columns[i]->append_data_by_selector(dst, _distinct_row);
+        }
+    } else {
+        vectorized::ColumnsWithTypeAndName columns_with_schema;
+        for (int i = 0; i < key_size; ++i) {
+            auto distinct_column = key_columns[i]->clone_empty();
+            key_columns[i]->append_data_by_selector(distinct_column, 
_distinct_row);
+            columns_with_schema.emplace_back(std::move(distinct_column),
+                                             
_probe_expr_ctxs[i]->root()->data_type(),
+                                             
_probe_expr_ctxs[i]->root()->expr_name());
+        }
+        out_block->swap(vectorized::Block(columns_with_schema));
+    }
+    return Status::OK();
+}
+
+void 
DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block* 
block) {

Review Comment:
   warning: method '_make_nullable_output_key' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/distinct_streaming_aggregation_operator.h:58:
   ```diff
   -     void _make_nullable_output_key(vectorized::Block* block);
   +     static void _make_nullable_output_key(vectorized::Block* block);
   ```
   



##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* 
state,
+                                                               OperatorXBase* 
parent)
+        : PipelineXLocalState<FakeDependency>(state, parent),
+          dummy_mapped_data(std::make_shared<char>('A')),
+          _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+          _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+          _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+          _child_block(vectorized::Block::create_unique()),
+          _child_source_state(SourceState::DEPEND_ON_SOURCE),
+          _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+    for (auto& evaluator : p._aggregate_evaluators) {
+        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+    }
+    _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+    for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, 
_probe_expr_ctxs[i]));
+    }
+
+    _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+    _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
+    _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
+    _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
+
+    if (_probe_expr_ctxs.empty()) {
+        _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
+                _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+    } else {
+        _init_hash_method(_probe_expr_ctxs);
+    }
+    return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
+        const vectorized::VExprContextSPtrs& probe_exprs) {
+    DCHECK(probe_exprs.size() >= 1);
+
+    using Type = vectorized::AggregatedDataVariants::Type;
+    Type t(Type::serialized);
+
+    if (probe_exprs.size() == 1) {
+        auto is_nullable = probe_exprs[0]->root()->is_nullable();
+        PrimitiveType type = probe_exprs[0]->root()->result_type();
+        switch (type) {
+        case TYPE_TINYINT:
+        case TYPE_BOOLEAN:
+        case TYPE_SMALLINT:
+        case TYPE_INT:
+        case TYPE_FLOAT:
+        case TYPE_DATEV2:
+        case TYPE_BIGINT:
+        case TYPE_DOUBLE:
+        case TYPE_DATE:
+        case TYPE_DATETIME:
+        case TYPE_DATETIMEV2:
+        case TYPE_LARGEINT:
+        case TYPE_DECIMALV2:
+        case TYPE_DECIMAL32:
+        case TYPE_DECIMAL64:
+        case TYPE_DECIMAL128I: {
+            size_t size = get_primitive_type_size(type);
+            if (size == 1) {
+                t = Type::int8_key;
+            } else if (size == 2) {
+                t = Type::int16_key;
+            } else if (size == 4) {
+                t = Type::int32_key;
+            } else if (size == 8) {
+                t = Type::int64_key;
+            } else if (size == 16) {
+                t = Type::int128_key;
+            } else {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "meet invalid type size, size={}, type={}", 
size,
+                                type_to_string(type));
+            }
+            break;
+        }
+        case TYPE_CHAR:
+        case TYPE_VARCHAR:
+        case TYPE_STRING: {
+            t = Type::string_key;
+            break;
+        }
+        default:
+            t = Type::serialized;
+        }
+
+        _agg_data->init(get_hash_key_type_with_phase(
+                                t, !Base::_parent->template 
cast<DistinctStreamingAggOperatorX>()
+                                            ._is_first_phase),
+                        is_nullable);
+    } else {
+        if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+                                            
vectorized::AggregateDataPtr>(_agg_data->method_variant,
+                                                                          
probe_exprs)) {
+            _agg_data->init(Type::serialized);
+        }
+    }
+}
+
+Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(

Review Comment:
   warning: method '_distinct_pre_agg_with_serialized_key' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/distinct_streaming_aggregation_operator.h:52:
   ```diff
   -     Status _distinct_pre_agg_with_serialized_key(vectorized::Block* 
in_block,
   +     static Status _distinct_pre_agg_with_serialized_key(vectorized::Block* 
in_block,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to