github-actions[bot] commented on code in PR #30875:
URL: https://github.com/apache/doris/pull/30875#discussion_r1497945606
##########
be/src/io/fs/local_file_system.cpp:
##########
@@ -453,5 +456,19 @@ Status LocalFileSystem::_glob(const std::string& pattern,
std::vector<std::strin
return Status::OK();
}
+Status LocalFileSystem::permission(const Path& file, std::filesystem::perms
prms) {
+ auto path = absolute_path(file);
+ FILESYSTEM_M(permission_impl(path, prms));
+}
+
+Status LocalFileSystem::permission_impl(const Path& file,
std::filesystem::perms prms) {
Review Comment:
warning: method 'permission_impl' can be made static
[readability-convert-member-functions-to-static]
be/src/io/fs/local_file_system.h:100:
```diff
- Status permission_impl(const Path& file, std::filesystem::perms prms);
+ static Status permission_impl(const Path& file, std::filesystem::perms
prms);
```
##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -2060,7 +2065,30 @@ Status
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
}
Status SegmentIterator::next_batch(vectorized::Block* block) {
- auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return
_next_batch_internal(block); }); }();
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION({
+ RETURN_IF_ERROR(_next_batch_internal(block));
+
+ // reverse block row order if read_orderby_key_reverse is true for
key topn
+ // it should be processed for all success _next_batch_internal
+ if (_opts.read_orderby_key_reverse) {
+ size_t num_rows = block->rows();
+ if (num_rows == 0) {
+ return Status::OK();
+ }
+ size_t num_columns = block->columns();
+ vectorized::IColumn::Permutation permutation;
+ for (size_t i = 0; i < num_rows; ++i)
permutation.emplace_back(num_rows - 1 - i);
Review Comment:
warning: statement should be inside braces
[readability-braces-around-statements]
```suggestion
for (size_t i = 0; i < num_rows; ++i) {
permutation.emplace_back(num_rows - 1 - i);
}
```
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState*
state,
+ OperatorXBase*
parent)
+ : PipelineXLocalState<FakeDependency>(state, parent),
+ dummy_mapped_data(std::make_shared<char>('A')),
+ _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+ _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+ _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+ _child_block(vectorized::Block::create_unique()),
+ _child_source_state(SourceState::DEPEND_ON_SOURCE),
+ _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(Base::exec_time_counter());
+ SCOPED_TIMER(Base::_open_timer);
+ auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+ for (auto& evaluator : p._aggregate_evaluators) {
+ _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+ }
+ _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+ for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state,
_probe_expr_ctxs[i]));
+ }
+
+ _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+ _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+ _hash_table_compute_timer = ADD_TIMER(Base::profile(),
"HashTableComputeTime");
+ _hash_table_emplace_timer = ADD_TIMER(Base::profile(),
"HashTableEmplaceTime");
+ _hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
+
+ if (_probe_expr_ctxs.empty()) {
+ _agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
+ _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+ } else {
+ _init_hash_method(_probe_expr_ctxs);
+ }
+ return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
Review Comment:
warning: method '_init_hash_method' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void DistinctStreamingAggLocalState::_init_hash_method(
```
##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -34,25 +34,24 @@ class VRuntimeFilterSlots {
public:
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>&
build_expr_ctxs,
- const std::vector<TRuntimeFilterDesc>& runtime_filter_descs, bool
is_global = false)
+ const std::vector<IRuntimeFilter*>& runtime_filters, bool
need_local_merge = false)
: _build_expr_context(build_expr_ctxs),
- _runtime_filter_descs(runtime_filter_descs),
- _is_global(is_global) {}
+ _runtime_filters(runtime_filters),
+ _need_local_merge(need_local_merge) {}
Status init(RuntimeState* state, int64_t hash_table_size) {
// runtime filter effect strategy
// 1. we will ignore IN filter when hash_table_size is too big
// 2. we will ignore BLOOM filter and MinMax filter when
hash_table_size
// is too small and IN filter has effect
-
std::map<int, bool> has_in_filter;
auto ignore_local_filter = [&](int filter_id) {
// Now pipeline x have bug in ignore, after fix the problem enable
ignore logic in pipeline x
- if (_is_global) {
+ if (_need_local_merge) {
return Status::OK();
}
- auto runtime_filter_mgr = state->runtime_filter_mgr();
+ auto runtime_filter_mgr = state->local_runtime_filter_mgr();
Review Comment:
warning: 'auto runtime_filter_mgr' can be declared as 'auto
*runtime_filter_mgr' [readability-qualified-auto]
```suggestion
auto *runtime_filter_mgr = state->local_runtime_filter_mgr();
```
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState*
state,
+ OperatorXBase*
parent)
+ : PipelineXLocalState<FakeDependency>(state, parent),
+ dummy_mapped_data(std::make_shared<char>('A')),
+ _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+ _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+ _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+ _child_block(vectorized::Block::create_unique()),
+ _child_source_state(SourceState::DEPEND_ON_SOURCE),
+ _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
Review Comment:
warning: method 'init' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/distinct_streaming_aggregation_operator.h:45:
```diff
- Status init(RuntimeState* state, LocalStateInfo& info) override;
+ static Status init(RuntimeState* state, LocalStateInfo& info) override;
```
##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -2060,7 +2065,30 @@
}
Status SegmentIterator::next_batch(vectorized::Block* block) {
- auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return
_next_batch_internal(block); }); }();
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION({
+ RETURN_IF_ERROR(_next_batch_internal(block));
+
+ // reverse block row order if read_orderby_key_reverse is true for
key topn
+ // it should be processed for all success _next_batch_internal
+ if (_opts.read_orderby_key_reverse) {
+ size_t num_rows = block->rows();
+ if (num_rows == 0) {
+ return Status::OK();
+ }
+ size_t num_columns = block->columns();
+ vectorized::IColumn::Permutation permutation;
+ for (size_t i = 0; i < num_rows; ++i)
permutation.emplace_back(num_rows - 1 - i);
+
+ for (size_t i = 0; i < num_columns; ++i)
Review Comment:
warning: statement should be inside braces
[readability-braces-around-statements]
```suggestion
for (size_t i = 0; i < num_columns; ++i) {
```
be/src/olap/rowset/segment_v2/segment_iterator.cpp:2084:
```diff
-
block->get_by_position(i).column->permute(permutation, num_rows);
+
block->get_by_position(i).column->permute(permutation, num_rows);
+ }
```
##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -830,8 +777,7 @@
return Status::OK();
}
-template <typename LocalStateType>
-Status AggSinkOperatorX<LocalStateType>::open(RuntimeState* state) {
+Status AggSinkOperatorX::open(RuntimeState* state) {
Review Comment:
warning: method 'open' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_sink_operator.h:342:
```diff
- Status open(RuntimeState* state) override;
+ static Status open(RuntimeState* state) override;
```
##########
be/src/olap/wal/wal_dirs_info.cpp:
##########
@@ -205,18 +207,18 @@
return Status::OK();
}
-Status WalDirsInfo::update_wal_dir_pre_allocated(const std::string& wal_dir,
- size_t increase_pre_allocated,
- size_t
decrease_pre_allocated) {
+Status WalDirsInfo::update_wal_dir_estimated_wal_bytes(const std::string&
wal_dir,
Review Comment:
warning: method 'update_wal_dir_estimated_wal_bytes' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal/wal_dirs_info.h:76:
```diff
- Status update_wal_dir_estimated_wal_bytes(const std::string& wal_dir,
+ static Status update_wal_dir_estimated_wal_bytes(const std::string&
wal_dir,
```
##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -232,9 +233,71 @@ Status WalManager::get_wal_path(int64_t wal_id,
std::string& wal_path) {
return Status::OK();
}
-Status WalManager::_scan_wals(const std::string& wal_path) {
- size_t count = 0;
- bool exists = true;
+Status WalManager::parse_wal_path(const std::string& file_name, int64_t&
version,
Review Comment:
warning: method 'parse_wal_path' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status WalManager::parse_wal_path(const std::string& file_name,
int64_t& version,
```
##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -232,9 +233,71 @@
return Status::OK();
}
-Status WalManager::_scan_wals(const std::string& wal_path) {
- size_t count = 0;
- bool exists = true;
+Status WalManager::parse_wal_path(const std::string& file_name, int64_t&
version,
+ int64_t& backend_id, int64_t& wal_id,
std::string& label) {
+ try {
+ // find version
+ auto pos = file_name.find("_");
+ version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
+ // find be id
+ auto substring1 = file_name.substr(pos + 1);
+ pos = substring1.find("_");
+ backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10);
+ // find wal id
+ auto substring2 = substring1.substr(pos + 1);
+ pos = substring2.find("_");
+ wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10);
+ // find label
+ label = substring2.substr(pos + 1);
+ VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id <<
",wal_id:" << wal_id
+ << ",label:" << label;
+ } catch (const std::invalid_argument& e) {
+ return Status::InvalidArgument("Invalid format, {}", e.what());
+ }
+ return Status::OK();
+}
+
+Status WalManager::_load_wals() {
Review Comment:
warning: method '_load_wals' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal/wal_manager.h:115:
```diff
- Status _load_wals();
+ static Status _load_wals();
```
##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -939,70 +938,69 @@ class RuntimePredicateWrapper {
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool*
pool,
const TRuntimeFilterDesc* desc, const
TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id,
IRuntimeFilter** res,
- bool build_bf_exactly, bool is_global, int
parallel_tasks) {
- *res = pool->add(new IRuntimeFilter(state, pool, desc, is_global,
parallel_tasks));
+ bool build_bf_exactly, bool need_local_merge) {
+ *res = pool->add(new IRuntimeFilter(state, pool, desc, need_local_merge));
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id,
- is_global ? false : build_bf_exactly);
+ need_local_merge ? false : build_bf_exactly);
}
vectorized::SharedRuntimeFilterContext&
IRuntimeFilter::get_shared_context_ref() {
return _wrapper->_context;
}
-void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) {
- _wrapper->_filter_type = other->_wrapper->_filter_type;
- _wrapper->_is_bloomfilter = other->is_bloomfilter();
- _wrapper->_context = other->_wrapper->_context;
-}
-
void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t
start) {
DCHECK(is_producer());
_wrapper->insert_batch(column, start);
}
-Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper,
int* merged_num) {
- SCOPED_TIMER(_merge_local_rf_timer);
- std::unique_lock lock(_local_merge_mutex);
- if (_merged_rf_num == 0) {
- _wrapper = wrapper;
- } else {
- RETURN_IF_ERROR(merge_from(wrapper));
- }
- *merged_num = ++_merged_rf_num;
- return Status::OK();
-}
-
Status IRuntimeFilter::publish(bool publish_local) {
Review Comment:
warning: function 'publish' has cognitive complexity of 77 (threshold 50)
[readability-function-cognitive-complexity]
```cpp
Status IRuntimeFilter::publish(bool publish_local) {
^
```
<details>
<summary>Additional context</summary>
**be/src/exprs/runtime_filter.cpp:958:** nesting level increased to 1
```cpp
auto send_to_remote = [&](IRuntimeFilter* filter) {
^
```
**be/src/exprs/runtime_filter.cpp:961:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:961:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:964:** nesting level increased to 1
```cpp
auto send_to_local = [&](RuntimePredicateWrapper* wrapper) {
^
```
**be/src/exprs/runtime_filter.cpp:966:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id,
filters));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:966:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id,
filters));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:976:** nesting level increased to 1
```cpp
auto do_local_merge = [&]() {
^
```
**be/src/exprs/runtime_filter.cpp:978:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:978:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:981:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:981:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:983:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
if (local_merge_filters->merge_time == 0) {
^
```
**be/src/exprs/runtime_filter.cpp:984:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
if (_has_local_target) {
^
```
**be/src/exprs/runtime_filter.cpp:985:** +4, including nesting penalty of 3,
nesting level increased to 4
```cpp
RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:985:** +5, including nesting penalty of 4,
nesting level increased to 5
```cpp
RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:986:** +1, nesting level increased to 3
```cpp
} else {
^
```
**be/src/exprs/runtime_filter.cpp:987:** +4, including nesting penalty of 3,
nesting level increased to 4
```cpp
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:987:** +5, including nesting penalty of 4,
nesting level increased to 5
```cpp
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:993:** +1, including nesting penalty of 0,
nesting level increased to 1
```cpp
if (_need_local_merge && _has_local_target) {
^
```
**be/src/exprs/runtime_filter.cpp:993:** +1
```cpp
if (_need_local_merge && _has_local_target) {
^
```
**be/src/exprs/runtime_filter.cpp:994:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(do_local_merge());
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:994:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(do_local_merge());
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:995:** +1, nesting level increased to 1
```cpp
} else if (_has_local_target) {
^
```
**be/src/exprs/runtime_filter.cpp:996:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(send_to_local(_wrapper));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:996:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(send_to_local(_wrapper));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:997:** +1, nesting level increased to 1
```cpp
} else if (!publish_local) {
^
```
**be/src/exprs/runtime_filter.cpp:998:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
if (_is_broadcast_join || _state->be_exec_version < 3) {
^
```
**be/src/exprs/runtime_filter.cpp:998:** +1
```cpp
if (_is_broadcast_join || _state->be_exec_version < 3) {
^
```
**be/src/exprs/runtime_filter.cpp:999:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(send_to_remote(this));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:999:** +4, including nesting penalty of 3,
nesting level increased to 4
```cpp
RETURN_IF_ERROR(send_to_remote(this));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:1000:** +1, nesting level increased to 2
```cpp
} else {
^
```
**be/src/exprs/runtime_filter.cpp:1001:** +3, including nesting penalty of
2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(do_local_merge());
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:1001:** +4, including nesting penalty of
3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(do_local_merge());
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:1003:** +1, nesting level increased to 1
```cpp
} else {
^
```
</details>
##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -48,30 +48,25 @@ OPERATOR_CODE_GENERATOR(AggSinkOperator, StreamingOperator)
/// using the planner's estimated input cardinality and the assumption that
input
/// is in a random order. This means that we assume that the reduction factor
will
/// increase over time.
-template <typename DependencyType, typename Derived>
-AggSinkLocalState<DependencyType,
Derived>::AggSinkLocalState(DataSinkOperatorXBase* parent,
- RuntimeState*
state)
+AggSinkLocalState::AggSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: Base(parent, state),
_hash_table_compute_timer(nullptr),
_hash_table_input_counter(nullptr),
_build_timer(nullptr),
_expr_timer(nullptr),
- _exec_timer(nullptr),
_serialize_key_timer(nullptr),
_merge_timer(nullptr),
_serialize_data_timer(nullptr),
_deserialize_data_timer(nullptr),
_max_row_size_counter(nullptr) {}
-template <typename DependencyType, typename Derived>
-Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
- LocalSinkStateInfo&
info) {
+Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
Review Comment:
warning: function 'init' exceeds recommended size/complexity thresholds
[readability-function-size]
```cpp
Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/exec/aggregation_sink_operator.cpp:62:** 83 lines
including whitespace and comments (threshold 80)
```cpp
Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
^
```
</details>
##########
be/src/olap/lru_cache.h:
##########
@@ -479,8 +486,10 @@ class DummyLRUCache : public Cache {
void* value(Handle* handle) override;
Slice value_slice(Handle* handle) override;
uint64_t new_id() override { return 0; };
- int64_t prune() override { return 0; };
- int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false)
override { return 0; };
+ PrunedInfo prune() override { return {0, 0}; };
+ PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode = false)
override {
Review Comment:
warning: method 'prune_if' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static PrunedInfo prune_if(CachePrunePredicate pred, bool lazy_mode =
false) override {
```
##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -260,9 +235,7 @@
_agg_data->method_variant);
}
-template <typename DependencyType, typename Derived>
-Status AggSinkLocalState<DependencyType, Derived>::_destroy_agg_status(
- vectorized::AggregateDataPtr data) {
+Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr
data) {
Review Comment:
warning: method '_destroy_agg_status' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_sink_operator.h:205:
```diff
- Status _destroy_agg_status(vectorized::AggregateDataPtr data);
+ static Status _destroy_agg_status(vectorized::AggregateDataPtr data);
```
##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -232,9 +233,71 @@
return Status::OK();
}
-Status WalManager::_scan_wals(const std::string& wal_path) {
- size_t count = 0;
- bool exists = true;
+Status WalManager::parse_wal_path(const std::string& file_name, int64_t&
version,
+ int64_t& backend_id, int64_t& wal_id,
std::string& label) {
+ try {
+ // find version
+ auto pos = file_name.find("_");
+ version = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
+ // find be id
+ auto substring1 = file_name.substr(pos + 1);
+ pos = substring1.find("_");
+ backend_id = std::strtoll(substring1.substr(0, pos).c_str(), NULL, 10);
+ // find wal id
+ auto substring2 = substring1.substr(pos + 1);
+ pos = substring2.find("_");
+ wal_id = std::strtoll(substring2.substr(0, pos).c_str(), NULL, 10);
+ // find label
+ label = substring2.substr(pos + 1);
+ VLOG_DEBUG << "version:" << version << "backend_id:" << backend_id <<
",wal_id:" << wal_id
+ << ",label:" << label;
+ } catch (const std::invalid_argument& e) {
+ return Status::InvalidArgument("Invalid format, {}", e.what());
+ }
+ return Status::OK();
+}
+
+Status WalManager::_load_wals() {
+ std::vector<ScanWalInfo> wals;
+ for (auto wal_dir : _wal_dirs) {
+ WARN_IF_ERROR(_scan_wals(wal_dir, wals), fmt::format("fail to scan wal
dir={}", wal_dir));
+ }
+ for (const auto& wal : wals) {
+ bool exists = false;
+ WARN_IF_ERROR(io::global_local_filesystem()->exists(wal.wal_path,
&exists),
+ fmt::format("fail to check exist on wal file={}",
wal.wal_path));
+ if (!exists) {
+ continue;
+ }
+ LOG(INFO) << "find wal: " << wal.wal_path;
+ {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
+ auto it = _wal_path_map.find(wal.wal_id);
+ if (it != _wal_path_map.end()) {
+ LOG(INFO) << "wal_id " << wal.wal_id << " already in
wal_path_map, skip it";
+ continue;
+ }
+ _wal_path_map.emplace(wal.wal_id, wal.wal_path);
+ }
+ // this config is use for test p0 case in pipeline
+ if (config::group_commit_wait_replay_wal_finish) {
+ auto lock = std::make_shared<std::mutex>();
+ auto cv = std::make_shared<std::condition_variable>();
+ auto add_st = add_wal_cv_map(wal.wal_id, lock, cv);
+ if (!add_st.ok()) {
+ LOG(WARNING) << "fail to add wal_id " << wal.wal_id << " to
wal_cv_map";
+ continue;
+ }
+ }
+ _exec_env->wal_mgr()->add_wal_queue(wal.tb_id, wal.wal_id);
+ WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id,
wal.wal_path),
+ fmt::format("Failed to add recover wal={}",
wal.wal_path));
+ }
+ return Status::OK();
+}
+
+Status WalManager::_scan_wals(const std::string& wal_path,
std::vector<ScanWalInfo>& res) {
Review Comment:
warning: method '_scan_wals' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status WalManager::_scan_wals(const std::string& wal_path,
std::vector<ScanWalInfo>& res) {
```
##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -178,9 +160,7 @@
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-Status AggSinkLocalState<DependencyType, Derived>::_create_agg_status(
- vectorized::AggregateDataPtr data) {
+Status AggSinkLocalState::_create_agg_status(vectorized::AggregateDataPtr
data) {
Review Comment:
warning: method '_create_agg_status' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_sink_operator.h:293:
```diff
- Status _create_agg_status(vectorized::AggregateDataPtr data);
+ static Status _create_agg_status(vectorized::AggregateDataPtr data);
```
##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -392,18 +363,15 @@
// We should call this function only at 1st phase.
// 1st phase: is_merge=true, only have one SlotRef.
// 2nd phase: is_merge=false, maybe have multiple exprs.
-template <typename DependencyType, typename Derived>
-int AggSinkLocalState<DependencyType, Derived>::_get_slot_column_id(
- const vectorized::AggFnEvaluator* evaluator) {
+int AggSinkLocalState::_get_slot_column_id(const vectorized::AggFnEvaluator*
evaluator) {
Review Comment:
warning: method '_get_slot_column_id' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_sink_operator.h:298:
```diff
- int _get_slot_column_id(const vectorized::AggFnEvaluator* evaluator);
+ static int _get_slot_column_id(const vectorized::AggFnEvaluator*
evaluator);
```
##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -842,18 +788,16 @@
return Status::OK();
}
-template <typename LocalStateType>
-Status AggSinkOperatorX<LocalStateType>::sink(doris::RuntimeState* state,
- vectorized::Block* in_block,
- SourceState source_state) {
+Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block*
in_block,
Review Comment:
warning: method 'sink' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_sink_operator.h:344:
```diff
- Status sink(RuntimeState* state, vectorized::Block* in_block,
+ static Status sink(RuntimeState* state, vectorized::Block* in_block,
```
##########
be/src/olap/wal/wal_dirs_info.cpp:
##########
@@ -50,20 +50,21 @@ void WalDirInfo::set_used(size_t used) {
_used = used;
}
-size_t WalDirInfo::get_pre_allocated() {
+size_t WalDirInfo::get_estimated_wal_bytes() {
Review Comment:
warning: method 'get_estimated_wal_bytes' can be made const
[readability-make-member-function-const]
be/src/olap/wal/wal_dirs_info.h:43:
```diff
- size_t get_estimated_wal_bytes();
+ size_t get_estimated_wal_bytes() const;
```
```suggestion
size_t WalDirInfo::get_estimated_wal_bytes() const {
```
##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -271,10 +244,8 @@
return Status::OK();
}
-template <typename DependencyType, typename Derived>
template <bool limit, bool for_spill>
-Status AggSinkLocalState<DependencyType,
Derived>::_merge_with_serialized_key_helper(
- vectorized::Block* block) {
+Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block*
block) {
Review Comment:
warning: function '_merge_with_serialized_key_helper' exceeds recommended
size/complexity thresholds [readability-function-size]
```cpp
Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* block) {
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/exec/aggregation_sink_operator.cpp:247:** 113 lines
including whitespace and comments (threshold 80)
```cpp
Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* block) {
^
```
</details>
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState*
state,
+ OperatorXBase*
parent)
+ : PipelineXLocalState<FakeDependency>(state, parent),
+ dummy_mapped_data(std::make_shared<char>('A')),
+ _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+ _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+ _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+ _child_block(vectorized::Block::create_unique()),
+ _child_source_state(SourceState::DEPEND_ON_SOURCE),
+ _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(Base::exec_time_counter());
+ SCOPED_TIMER(Base::_open_timer);
+ auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+ for (auto& evaluator : p._aggregate_evaluators) {
+ _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+ }
+ _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+ for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state,
_probe_expr_ctxs[i]));
+ }
+
+ _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+ _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+ _hash_table_compute_timer = ADD_TIMER(Base::profile(),
"HashTableComputeTime");
+ _hash_table_emplace_timer = ADD_TIMER(Base::profile(),
"HashTableEmplaceTime");
+ _hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
+
+ if (_probe_expr_ctxs.empty()) {
+ _agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
+ _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+ } else {
+ _init_hash_method(_probe_expr_ctxs);
+ }
+ return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
+ const vectorized::VExprContextSPtrs& probe_exprs) {
+ DCHECK(probe_exprs.size() >= 1);
+
+ using Type = vectorized::AggregatedDataVariants::Type;
+ Type t(Type::serialized);
+
+ if (probe_exprs.size() == 1) {
+ auto is_nullable = probe_exprs[0]->root()->is_nullable();
+ PrimitiveType type = probe_exprs[0]->root()->result_type();
+ switch (type) {
+ case TYPE_TINYINT:
+ case TYPE_BOOLEAN:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_FLOAT:
+ case TYPE_DATEV2:
+ case TYPE_BIGINT:
+ case TYPE_DOUBLE:
+ case TYPE_DATE:
+ case TYPE_DATETIME:
+ case TYPE_DATETIMEV2:
+ case TYPE_LARGEINT:
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I: {
+ size_t size = get_primitive_type_size(type);
+ if (size == 1) {
+ t = Type::int8_key;
+ } else if (size == 2) {
+ t = Type::int16_key;
+ } else if (size == 4) {
+ t = Type::int32_key;
+ } else if (size == 8) {
+ t = Type::int64_key;
+ } else if (size == 16) {
+ t = Type::int128_key;
+ } else {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "meet invalid type size, size={}, type={}",
size,
+ type_to_string(type));
+ }
+ break;
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ t = Type::string_key;
+ break;
+ }
+ default:
+ t = Type::serialized;
+ }
+
+ _agg_data->init(get_hash_key_type_with_phase(
+ t, !Base::_parent->template
cast<DistinctStreamingAggOperatorX>()
+ ._is_first_phase),
+ is_nullable);
+ } else {
+ if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+
vectorized::AggregateDataPtr>(_agg_data->method_variant,
+
probe_exprs)) {
+ _agg_data->init(Type::serialized);
+ }
+ }
+}
+
+Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
+ doris::vectorized::Block* in_block, doris::vectorized::Block*
out_block) {
+ SCOPED_TIMER(_build_timer);
+ DCHECK(!_probe_expr_ctxs.empty());
+
+ size_t key_size = _probe_expr_ctxs.size();
+ vectorized::ColumnRawPtrs key_columns(key_size);
+ {
+ SCOPED_TIMER(_expr_timer);
+ for (size_t i = 0; i < key_size; ++i) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block,
&result_column_id));
+ in_block->get_by_position(result_column_id).column =
+ in_block->get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+ key_columns[i] =
in_block->get_by_position(result_column_id).column.get();
+ }
+ }
+
+ int rows = in_block->rows();
+ _distinct_row.clear();
+ _distinct_row.reserve(rows);
+
+ RETURN_IF_CATCH_EXCEPTION(
+ _emplace_into_hash_table_to_distinct(_distinct_row, key_columns,
rows));
+
+ bool mem_reuse =
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
+ out_block->mem_reuse();
+ if (mem_reuse) {
+ for (int i = 0; i < key_size; ++i) {
+ auto dst = out_block->get_by_position(i).column->assume_mutable();
+ key_columns[i]->append_data_by_selector(dst, _distinct_row);
+ }
+ } else {
+ vectorized::ColumnsWithTypeAndName columns_with_schema;
+ for (int i = 0; i < key_size; ++i) {
+ auto distinct_column = key_columns[i]->clone_empty();
+ key_columns[i]->append_data_by_selector(distinct_column,
_distinct_row);
+ columns_with_schema.emplace_back(std::move(distinct_column),
+
_probe_expr_ctxs[i]->root()->data_type(),
+
_probe_expr_ctxs[i]->root()->expr_name());
+ }
+ out_block->swap(vectorized::Block(columns_with_schema));
+ }
+ return Status::OK();
+}
+
+void
DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block*
block) {
+ if (block->rows() != 0) {
+ for (auto cid :
Base::_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys) {
+ block->get_by_position(cid).column =
make_nullable(block->get_by_position(cid).column);
+ block->get_by_position(cid).type =
make_nullable(block->get_by_position(cid).type);
+ }
+ }
+}
+
+void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
+ vectorized::IColumn::Selector& distinct_row,
vectorized::ColumnRawPtrs& key_columns,
+ const size_t num_rows) {
+ std::visit(
+ [&](auto&& agg_method) -> void {
+ SCOPED_TIMER(_hash_table_compute_timer);
+ using HashMethodType = std::decay_t<decltype(agg_method)>;
+ using AggState = typename HashMethodType::State;
+ AggState state(key_columns);
+ agg_method.init_serialized_keys(key_columns, num_rows);
+ size_t row = 0;
+ auto creator = [&](const auto& ctor, auto& key, auto& origin) {
+ HashMethodType::try_presis_key(key, origin, _arena);
+ ctor(key, dummy_mapped_data.get());
+ distinct_row.push_back(row);
+ };
+ auto creator_for_null_key = [&](auto& mapped) {
+ mapped = dummy_mapped_data.get();
+ distinct_row.push_back(row);
+ };
+
+ SCOPED_TIMER(_hash_table_emplace_timer);
+ for (; row < num_rows; ++row) {
+ agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
+ }
+
+ COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+ },
+ _agg_data->method_variant);
+}
+
+DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool,
int operator_id,
+ const TPlanNode&
tnode,
+ const
DescriptorTbl& descs)
+ : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode,
operator_id, descs),
+ _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
+ _intermediate_tuple_desc(nullptr),
Review Comment:
warning: member initializer for '_intermediate_tuple_desc' is redundant
[modernize-use-default-member-init]
```suggestion
,
```
##########
be/src/pipeline/exec/aggregation_sink_operator.cpp:
##########
@@ -865,8 +809,7 @@
return Status::OK();
}
-template <typename DependencyType, typename Derived>
-Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state,
Status exec_status) {
+Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {
Review Comment:
warning: method 'close' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_sink_operator.h:66:
```diff
- Status close(RuntimeState* state, Status exec_status) override;
+ static Status close(RuntimeState* state, Status exec_status) override;
```
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState*
state,
+ OperatorXBase*
parent)
+ : PipelineXLocalState<FakeDependency>(state, parent),
+ dummy_mapped_data(std::make_shared<char>('A')),
+ _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+ _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+ _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+ _child_block(vectorized::Block::create_unique()),
+ _child_source_state(SourceState::DEPEND_ON_SOURCE),
+ _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(Base::exec_time_counter());
+ SCOPED_TIMER(Base::_open_timer);
+ auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+ for (auto& evaluator : p._aggregate_evaluators) {
+ _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+ }
+ _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+ for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state,
_probe_expr_ctxs[i]));
+ }
+
+ _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+ _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+ _hash_table_compute_timer = ADD_TIMER(Base::profile(),
"HashTableComputeTime");
+ _hash_table_emplace_timer = ADD_TIMER(Base::profile(),
"HashTableEmplaceTime");
+ _hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
+
+ if (_probe_expr_ctxs.empty()) {
+ _agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
+ _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+ } else {
+ _init_hash_method(_probe_expr_ctxs);
+ }
+ return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
+ const vectorized::VExprContextSPtrs& probe_exprs) {
+ DCHECK(probe_exprs.size() >= 1);
+
+ using Type = vectorized::AggregatedDataVariants::Type;
+ Type t(Type::serialized);
+
+ if (probe_exprs.size() == 1) {
+ auto is_nullable = probe_exprs[0]->root()->is_nullable();
+ PrimitiveType type = probe_exprs[0]->root()->result_type();
+ switch (type) {
+ case TYPE_TINYINT:
+ case TYPE_BOOLEAN:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_FLOAT:
+ case TYPE_DATEV2:
+ case TYPE_BIGINT:
+ case TYPE_DOUBLE:
+ case TYPE_DATE:
+ case TYPE_DATETIME:
+ case TYPE_DATETIMEV2:
+ case TYPE_LARGEINT:
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I: {
+ size_t size = get_primitive_type_size(type);
+ if (size == 1) {
+ t = Type::int8_key;
+ } else if (size == 2) {
+ t = Type::int16_key;
+ } else if (size == 4) {
+ t = Type::int32_key;
+ } else if (size == 8) {
+ t = Type::int64_key;
+ } else if (size == 16) {
+ t = Type::int128_key;
+ } else {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "meet invalid type size, size={}, type={}",
size,
+ type_to_string(type));
+ }
+ break;
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ t = Type::string_key;
+ break;
+ }
+ default:
+ t = Type::serialized;
+ }
+
+ _agg_data->init(get_hash_key_type_with_phase(
+ t, !Base::_parent->template
cast<DistinctStreamingAggOperatorX>()
+ ._is_first_phase),
+ is_nullable);
+ } else {
+ if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+
vectorized::AggregateDataPtr>(_agg_data->method_variant,
+
probe_exprs)) {
+ _agg_data->init(Type::serialized);
+ }
+ }
+}
+
+Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
+ doris::vectorized::Block* in_block, doris::vectorized::Block*
out_block) {
+ SCOPED_TIMER(_build_timer);
+ DCHECK(!_probe_expr_ctxs.empty());
+
+ size_t key_size = _probe_expr_ctxs.size();
+ vectorized::ColumnRawPtrs key_columns(key_size);
+ {
+ SCOPED_TIMER(_expr_timer);
+ for (size_t i = 0; i < key_size; ++i) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block,
&result_column_id));
+ in_block->get_by_position(result_column_id).column =
+ in_block->get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+ key_columns[i] =
in_block->get_by_position(result_column_id).column.get();
+ }
+ }
+
+ int rows = in_block->rows();
+ _distinct_row.clear();
+ _distinct_row.reserve(rows);
+
+ RETURN_IF_CATCH_EXCEPTION(
+ _emplace_into_hash_table_to_distinct(_distinct_row, key_columns,
rows));
+
+ bool mem_reuse =
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
+ out_block->mem_reuse();
+ if (mem_reuse) {
+ for (int i = 0; i < key_size; ++i) {
+ auto dst = out_block->get_by_position(i).column->assume_mutable();
+ key_columns[i]->append_data_by_selector(dst, _distinct_row);
+ }
+ } else {
+ vectorized::ColumnsWithTypeAndName columns_with_schema;
+ for (int i = 0; i < key_size; ++i) {
+ auto distinct_column = key_columns[i]->clone_empty();
+ key_columns[i]->append_data_by_selector(distinct_column,
_distinct_row);
+ columns_with_schema.emplace_back(std::move(distinct_column),
+
_probe_expr_ctxs[i]->root()->data_type(),
+
_probe_expr_ctxs[i]->root()->expr_name());
+ }
+ out_block->swap(vectorized::Block(columns_with_schema));
+ }
+ return Status::OK();
+}
+
+void
DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block*
block) {
+ if (block->rows() != 0) {
+ for (auto cid :
Base::_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys) {
+ block->get_by_position(cid).column =
make_nullable(block->get_by_position(cid).column);
+ block->get_by_position(cid).type =
make_nullable(block->get_by_position(cid).type);
+ }
+ }
+}
+
+void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
+ vectorized::IColumn::Selector& distinct_row,
vectorized::ColumnRawPtrs& key_columns,
+ const size_t num_rows) {
+ std::visit(
+ [&](auto&& agg_method) -> void {
+ SCOPED_TIMER(_hash_table_compute_timer);
+ using HashMethodType = std::decay_t<decltype(agg_method)>;
+ using AggState = typename HashMethodType::State;
+ AggState state(key_columns);
+ agg_method.init_serialized_keys(key_columns, num_rows);
+ size_t row = 0;
+ auto creator = [&](const auto& ctor, auto& key, auto& origin) {
+ HashMethodType::try_presis_key(key, origin, _arena);
+ ctor(key, dummy_mapped_data.get());
+ distinct_row.push_back(row);
+ };
+ auto creator_for_null_key = [&](auto& mapped) {
+ mapped = dummy_mapped_data.get();
+ distinct_row.push_back(row);
+ };
+
+ SCOPED_TIMER(_hash_table_emplace_timer);
+ for (; row < num_rows; ++row) {
+ agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
+ }
+
+ COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+ },
+ _agg_data->method_variant);
+}
+
+DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool,
int operator_id,
+ const TPlanNode&
tnode,
+ const
DescriptorTbl& descs)
+ : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode,
operator_id, descs),
+ _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
+ _intermediate_tuple_desc(nullptr),
+ _output_tuple_id(tnode.agg_node.output_tuple_id),
+ _output_tuple_desc(nullptr),
Review Comment:
warning: member initializer for '_output_tuple_desc' is redundant
[modernize-use-default-member-init]
```suggestion
,
```
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState*
state,
+ OperatorXBase*
parent)
+ : PipelineXLocalState<FakeDependency>(state, parent),
+ dummy_mapped_data(std::make_shared<char>('A')),
+ _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+ _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+ _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+ _child_block(vectorized::Block::create_unique()),
+ _child_source_state(SourceState::DEPEND_ON_SOURCE),
+ _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(Base::exec_time_counter());
+ SCOPED_TIMER(Base::_open_timer);
+ auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+ for (auto& evaluator : p._aggregate_evaluators) {
+ _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+ }
+ _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+ for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state,
_probe_expr_ctxs[i]));
+ }
+
+ _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+ _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+ _hash_table_compute_timer = ADD_TIMER(Base::profile(),
"HashTableComputeTime");
+ _hash_table_emplace_timer = ADD_TIMER(Base::profile(),
"HashTableEmplaceTime");
+ _hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
+
+ if (_probe_expr_ctxs.empty()) {
+ _agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
+ _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+ } else {
+ _init_hash_method(_probe_expr_ctxs);
+ }
+ return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
+ const vectorized::VExprContextSPtrs& probe_exprs) {
+ DCHECK(probe_exprs.size() >= 1);
+
+ using Type = vectorized::AggregatedDataVariants::Type;
+ Type t(Type::serialized);
+
+ if (probe_exprs.size() == 1) {
+ auto is_nullable = probe_exprs[0]->root()->is_nullable();
+ PrimitiveType type = probe_exprs[0]->root()->result_type();
+ switch (type) {
+ case TYPE_TINYINT:
+ case TYPE_BOOLEAN:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_FLOAT:
+ case TYPE_DATEV2:
+ case TYPE_BIGINT:
+ case TYPE_DOUBLE:
+ case TYPE_DATE:
+ case TYPE_DATETIME:
+ case TYPE_DATETIMEV2:
+ case TYPE_LARGEINT:
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I: {
+ size_t size = get_primitive_type_size(type);
+ if (size == 1) {
+ t = Type::int8_key;
+ } else if (size == 2) {
+ t = Type::int16_key;
+ } else if (size == 4) {
+ t = Type::int32_key;
+ } else if (size == 8) {
+ t = Type::int64_key;
+ } else if (size == 16) {
+ t = Type::int128_key;
+ } else {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "meet invalid type size, size={}, type={}",
size,
+ type_to_string(type));
+ }
+ break;
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ t = Type::string_key;
+ break;
+ }
+ default:
+ t = Type::serialized;
+ }
+
+ _agg_data->init(get_hash_key_type_with_phase(
+ t, !Base::_parent->template
cast<DistinctStreamingAggOperatorX>()
+ ._is_first_phase),
+ is_nullable);
+ } else {
+ if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+
vectorized::AggregateDataPtr>(_agg_data->method_variant,
+
probe_exprs)) {
+ _agg_data->init(Type::serialized);
+ }
+ }
+}
+
+Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
+ doris::vectorized::Block* in_block, doris::vectorized::Block*
out_block) {
+ SCOPED_TIMER(_build_timer);
+ DCHECK(!_probe_expr_ctxs.empty());
+
+ size_t key_size = _probe_expr_ctxs.size();
+ vectorized::ColumnRawPtrs key_columns(key_size);
+ {
+ SCOPED_TIMER(_expr_timer);
+ for (size_t i = 0; i < key_size; ++i) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block,
&result_column_id));
+ in_block->get_by_position(result_column_id).column =
+ in_block->get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+ key_columns[i] =
in_block->get_by_position(result_column_id).column.get();
+ }
+ }
+
+ int rows = in_block->rows();
+ _distinct_row.clear();
+ _distinct_row.reserve(rows);
+
+ RETURN_IF_CATCH_EXCEPTION(
+ _emplace_into_hash_table_to_distinct(_distinct_row, key_columns,
rows));
+
+ bool mem_reuse =
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
+ out_block->mem_reuse();
+ if (mem_reuse) {
+ for (int i = 0; i < key_size; ++i) {
+ auto dst = out_block->get_by_position(i).column->assume_mutable();
+ key_columns[i]->append_data_by_selector(dst, _distinct_row);
+ }
+ } else {
+ vectorized::ColumnsWithTypeAndName columns_with_schema;
+ for (int i = 0; i < key_size; ++i) {
+ auto distinct_column = key_columns[i]->clone_empty();
+ key_columns[i]->append_data_by_selector(distinct_column,
_distinct_row);
+ columns_with_schema.emplace_back(std::move(distinct_column),
+
_probe_expr_ctxs[i]->root()->data_type(),
+
_probe_expr_ctxs[i]->root()->expr_name());
+ }
+ out_block->swap(vectorized::Block(columns_with_schema));
+ }
+ return Status::OK();
+}
+
+void
DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block*
block) {
Review Comment:
warning: method '_make_nullable_output_key' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/distinct_streaming_aggregation_operator.h:58:
```diff
- void _make_nullable_output_key(vectorized::Block* block);
+ static void _make_nullable_output_key(vectorized::Block* block);
```
##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState*
state,
+ OperatorXBase*
parent)
+ : PipelineXLocalState<FakeDependency>(state, parent),
+ dummy_mapped_data(std::make_shared<char>('A')),
+ _agg_arena_pool(std::make_unique<vectorized::Arena>()),
+ _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+ _agg_profile_arena(std::make_unique<vectorized::Arena>()),
+ _child_block(vectorized::Block::create_unique()),
+ _child_source_state(SourceState::DEPEND_ON_SOURCE),
+ _aggregated_block(vectorized::Block::create_unique()) {}
+
+Status DistinctStreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(Base::exec_time_counter());
+ SCOPED_TIMER(Base::_open_timer);
+ auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
+ for (auto& evaluator : p._aggregate_evaluators) {
+ _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+ }
+ _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+ for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state,
_probe_expr_ctxs[i]));
+ }
+
+ _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
+ _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
+ _hash_table_compute_timer = ADD_TIMER(Base::profile(),
"HashTableComputeTime");
+ _hash_table_emplace_timer = ADD_TIMER(Base::profile(),
"HashTableEmplaceTime");
+ _hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
+
+ if (_probe_expr_ctxs.empty()) {
+ _agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
+ _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+ } else {
+ _init_hash_method(_probe_expr_ctxs);
+ }
+ return Status::OK();
+}
+
+void DistinctStreamingAggLocalState::_init_hash_method(
+ const vectorized::VExprContextSPtrs& probe_exprs) {
+ DCHECK(probe_exprs.size() >= 1);
+
+ using Type = vectorized::AggregatedDataVariants::Type;
+ Type t(Type::serialized);
+
+ if (probe_exprs.size() == 1) {
+ auto is_nullable = probe_exprs[0]->root()->is_nullable();
+ PrimitiveType type = probe_exprs[0]->root()->result_type();
+ switch (type) {
+ case TYPE_TINYINT:
+ case TYPE_BOOLEAN:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_FLOAT:
+ case TYPE_DATEV2:
+ case TYPE_BIGINT:
+ case TYPE_DOUBLE:
+ case TYPE_DATE:
+ case TYPE_DATETIME:
+ case TYPE_DATETIMEV2:
+ case TYPE_LARGEINT:
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I: {
+ size_t size = get_primitive_type_size(type);
+ if (size == 1) {
+ t = Type::int8_key;
+ } else if (size == 2) {
+ t = Type::int16_key;
+ } else if (size == 4) {
+ t = Type::int32_key;
+ } else if (size == 8) {
+ t = Type::int64_key;
+ } else if (size == 16) {
+ t = Type::int128_key;
+ } else {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "meet invalid type size, size={}, type={}",
size,
+ type_to_string(type));
+ }
+ break;
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ t = Type::string_key;
+ break;
+ }
+ default:
+ t = Type::serialized;
+ }
+
+ _agg_data->init(get_hash_key_type_with_phase(
+ t, !Base::_parent->template
cast<DistinctStreamingAggOperatorX>()
+ ._is_first_phase),
+ is_nullable);
+ } else {
+ if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+
vectorized::AggregateDataPtr>(_agg_data->method_variant,
+
probe_exprs)) {
+ _agg_data->init(Type::serialized);
+ }
+ }
+}
+
+Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
Review Comment:
warning: method '_distinct_pre_agg_with_serialized_key' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/distinct_streaming_aggregation_operator.h:52:
```diff
- Status _distinct_pre_agg_with_serialized_key(vectorized::Block*
in_block,
+ static Status _distinct_pre_agg_with_serialized_key(vectorized::Block*
in_block,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]