github-actions[bot] commented on code in PR #26058:
URL: https://github.com/apache/doris/pull/26058#discussion_r1382338745
##########
be/src/io/fs/multi_table_pipe.cpp:
##########
@@ -300,7 +300,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<ExecParam> para
#endif
-Status MultiTablePipe::putPipe(const TUniqueId& pipe_id,
std::shared_ptr<io::StreamLoadPipe> pipe) {
+Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
Review Comment:
warning: method 'put_pipe' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
```
##########
be/src/runtime/group_commit_mgr.h:
##########
@@ -19,36 +19,33 @@
#include <gen_cpp/PaloInternalService_types.h>
Review Comment:
warning: 'gen_cpp/PaloInternalService_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/PaloInternalService_types.h>
^
```
##########
be/src/vec/data_types/data_type_date_time.h:
##########
@@ -86,8 +86,8 @@ class DataTypeDateTime final : public
DataTypeNumberBase<Int64> {
std::string to_string(const IColumn& column, size_t row_num) const
override;
- DataTypeSerDeSPtr get_serde() const override {
- return std::make_shared<DataTypeDateTimeSerDe>();
+ DataTypeSerDeSPtr get_serde(int nesting_level = 1) const override {
Review Comment:
warning: method 'get_serde' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static DataTypeSerDeSPtr get_serde(int nesting_level = 1) override {
```
##########
be/src/pipeline/exec/scan_operator.h:
##########
@@ -219,7 +220,8 @@ class ScanLocalState : public ScanLocalStateBase {
}
Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts)
override;
- virtual void set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) override {}
+ virtual void set_scan_ranges(RuntimeState* state,
Review Comment:
warning: 'virtual' is redundant since the function is already declared
'override' [modernize-use-override]
```suggestion
void set_scan_ranges(RuntimeState* state,
```
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -326,4 +330,113 @@
return Status::OK();
}
+bool RuntimeFilterTimer::has_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ return _runtime_filter->is_ready();
+}
+
+void RuntimeFilterTimer::call_timeout() {
+ std::unique_lock<std::mutex> lc(_lock);
+ if (_call_ready) {
+ return;
+ }
+ _call_timeout = true;
+ if (_parent) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ if (_call_timeout) {
+ return;
+ }
+ _call_ready = true;
+ if (_parent) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_has_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ DCHECK(!_call_timeout);
+ if (!_call_ready) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_has_release() {
+ // When the use count is equal to 1, only the timer queue still holds
ownership,
+ // so there is no need to take any action.
+}
+
+struct RuntimeFilterTimerQueue {
+ constexpr static int64_t interval = 50;
+ void start() {
+ while (true) {
+ std::unique_lock<std::mutex> lk(cv_m);
+
+ cv.wait(lk, [this] { return !_que.empty(); });
+ {
+ std::unique_lock<std::mutex> lc(_que_lock);
+ std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
+ for (auto& it : _que) {
+ if (it.use_count() == 1) {
+ it->call_has_release();
+ } else if (it->has_ready()) {
+ it->call_has_ready();
+ } else {
+ int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
+ if (ms_since_registration > it->wait_time_ms()) {
+ it->call_timeout();
+ } else {
+ new_que.push_back(std::move(it));
+ }
+ }
+ }
+ new_que.swap(_que);
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+ }
+ }
+ ~RuntimeFilterTimerQueue() { _thread.detach(); }
+ RuntimeFilterTimerQueue() { _thread =
std::thread(&RuntimeFilterTimerQueue::start, this); }
+ static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) {
+ static RuntimeFilterTimerQueue timer_que;
+
+ timer_que.push(filter);
+ }
+
+ void push(std::shared_ptr<RuntimeFilterTimer> filter) {
+ std::unique_lock<std::mutex> lc(_que_lock);
+ _que.push_back(filter);
+ cv.notify_all();
+ }
+
+ std::thread _thread;
+ std::condition_variable cv;
+ std::mutex cv_m;
+ std::mutex _que_lock;
+
+ std::list<std::shared_ptr<RuntimeFilterTimer>> _que;
+};
+
+void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
+ _filters++;
+ int64_t registration_time = runtime_filter->registration_time();
+ int32 wait_time_ms = runtime_filter->wait_time_ms();
+ auto filter_timer = std::make_shared<RuntimeFilterTimer>(
+ registration_time, wait_time_ms,
+
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()),
runtime_filter);
+ runtime_filter->set_filter_timer(filter_timer);
+ RuntimeFilterTimerQueue::push_filter_timer(filter_timer);
+}
+
+void RuntimeFilterDependency::sub_filters() {
Review Comment:
warning: method 'sub_filters' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/dependency.h:250:
```diff
- void sub_filters();
+ static void sub_filters();
```
##########
be/src/io/fs/multi_table_pipe.cpp:
##########
@@ -310,16 +311,16 @@
return Status::OK();
}
-std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId&
pipe_id) {
+std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::get_pipe(const TUniqueId&
pipe_id) {
Review Comment:
warning: method 'get_pipe' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::get_pipe(const
TUniqueId& pipe_id) {
```
##########
be/src/vec/core/decomposed_float.h:
##########
@@ -113,16 +113,16 @@ struct DecomposedFloat {
}
/// The case of the most negative integer
- if constexpr (is_signed_v<Int>) {
+ if constexpr (wide::is_signed_v<Int>) {
if (rhs == std::numeric_limits<Int>::lowest()) {
assert(isNegative());
if (normalizedExponent() <
- static_cast<int16_t>(8 * sizeof(Int) - is_signed_v<Int>)) {
+ static_cast<int16_t>(8 * sizeof(Int) -
wide::is_signed_v<Int>)) {
Review Comment:
warning: 8 is a magic number; consider replacing it with a named constant
[readability-magic-numbers]
```cpp
static_cast<int16_t>(8 * sizeof(Int) -
wide::is_signed_v<Int>)) {
^
```
##########
be/src/vec/data_types/data_type_ipv4.h:
##########
@@ -64,7 +64,9 @@ class DataTypeIPv4 final : public DataTypeNumberBase<IPv4> {
MutableColumnPtr create_column() const override;
- DataTypeSerDeSPtr get_serde() const override { return
std::make_shared<DataTypeIPv4SerDe>(); }
+ DataTypeSerDeSPtr get_serde(int nesting_level = 1) const override {
Review Comment:
warning: method 'get_serde' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static DataTypeSerDeSPtr get_serde(int nesting_level = 1) override {
```
##########
be/src/io/fs/multi_table_pipe.cpp:
##########
@@ -310,16 +311,16 @@
return Status::OK();
}
-std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId&
pipe_id) {
+std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::get_pipe(const TUniqueId&
pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it == std::end(_pipe_map)) {
- return std::shared_ptr<io::StreamLoadPipe>(nullptr);
+ return {};
}
return it->second;
}
-void MultiTablePipe::removePipe(const TUniqueId& pipe_id) {
+void MultiTablePipe::remove_pipe(const TUniqueId& pipe_id) {
Review Comment:
warning: method 'remove_pipe' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void MultiTablePipe::remove_pipe(const TUniqueId& pipe_id) {
```
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -326,4 +330,113 @@ Status
HashJoinDependency::extract_join_column(vectorized::Block& block,
return Status::OK();
}
+bool RuntimeFilterTimer::has_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ return _runtime_filter->is_ready();
+}
+
+void RuntimeFilterTimer::call_timeout() {
+ std::unique_lock<std::mutex> lc(_lock);
+ if (_call_ready) {
+ return;
+ }
+ _call_timeout = true;
+ if (_parent) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ if (_call_timeout) {
+ return;
+ }
+ _call_ready = true;
+ if (_parent) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_has_ready() {
Review Comment:
warning: method 'call_has_ready' can be made const
[readability-make-member-function-const]
be/src/pipeline/pipeline_x/dependency.h:216:
```diff
- void call_has_ready();
+ void call_has_ready() const;
```
```suggestion
void RuntimeFilterTimer::call_has_ready() const {
```
##########
be/src/service/internal_service.cpp:
##########
@@ -516,7 +515,11 @@
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact,
&t_request));
}
- return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+ if (cb) {
+ return _exec_env->fragment_mgr()->exec_plan_fragment(t_request,
cb);
+ } else {
+ return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+ }
Review Comment:
warning: do not use 'else' after 'return' [readability-else-after-return]
```suggestion
} return
_exec_env->fragment_mgr()->exec_plan_fragment(t_request);
```
##########
be/src/vec/core/decomposed_float.h:
##########
@@ -134,7 +134,8 @@
}
/// Too large number: abs(float) > abs(rhs). Also the case with
infinities and NaN.
- if (normalizedExponent() >= static_cast<int16_t>(8 * sizeof(Int) -
is_signed_v<Int>)) {
+ if (normalizedExponent() >=
+ static_cast<int16_t>(8 * sizeof(Int) - wide::is_signed_v<Int>)) {
Review Comment:
warning: 8 is a magic number; consider replacing it with a named constant
[readability-magic-numbers]
```cpp
static_cast<int16_t>(8 * sizeof(Int) - wide::is_signed_v<Int>)) {
^
```
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -326,4 +330,113 @@
return Status::OK();
}
+bool RuntimeFilterTimer::has_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ return _runtime_filter->is_ready();
+}
+
+void RuntimeFilterTimer::call_timeout() {
+ std::unique_lock<std::mutex> lc(_lock);
+ if (_call_ready) {
+ return;
+ }
+ _call_timeout = true;
+ if (_parent) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ if (_call_timeout) {
+ return;
+ }
+ _call_ready = true;
+ if (_parent) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_has_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ DCHECK(!_call_timeout);
+ if (!_call_ready) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_has_release() {
+ // When the use count is equal to 1, only the timer queue still holds
ownership,
+ // so there is no need to take any action.
+}
+
+struct RuntimeFilterTimerQueue {
+ constexpr static int64_t interval = 50;
+ void start() {
+ while (true) {
+ std::unique_lock<std::mutex> lk(cv_m);
+
+ cv.wait(lk, [this] { return !_que.empty(); });
+ {
+ std::unique_lock<std::mutex> lc(_que_lock);
+ std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
+ for (auto& it : _que) {
+ if (it.use_count() == 1) {
+ it->call_has_release();
+ } else if (it->has_ready()) {
+ it->call_has_ready();
+ } else {
+ int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
+ if (ms_since_registration > it->wait_time_ms()) {
+ it->call_timeout();
+ } else {
+ new_que.push_back(std::move(it));
+ }
+ }
+ }
+ new_que.swap(_que);
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+ }
+ }
+ ~RuntimeFilterTimerQueue() { _thread.detach(); }
+ RuntimeFilterTimerQueue() { _thread =
std::thread(&RuntimeFilterTimerQueue::start, this); }
+ static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) {
+ static RuntimeFilterTimerQueue timer_que;
+
+ timer_que.push(filter);
+ }
+
+ void push(std::shared_ptr<RuntimeFilterTimer> filter) {
+ std::unique_lock<std::mutex> lc(_que_lock);
+ _que.push_back(filter);
+ cv.notify_all();
+ }
+
+ std::thread _thread;
+ std::condition_variable cv;
+ std::mutex cv_m;
+ std::mutex _que_lock;
+
+ std::list<std::shared_ptr<RuntimeFilterTimer>> _que;
+};
+
+void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
Review Comment:
warning: method 'add_filters' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/dependency.h:249:
```diff
- void add_filters(IRuntimeFilter* runtime_filter);
+ static void add_filters(IRuntimeFilter* runtime_filter);
```
##########
be/src/vec/core/decomposed_float.h:
##########
@@ -113,16 +113,16 @@
}
/// The case of the most negative integer
- if constexpr (is_signed_v<Int>) {
+ if constexpr (wide::is_signed_v<Int>) {
if (rhs == std::numeric_limits<Int>::lowest()) {
assert(isNegative());
if (normalizedExponent() <
- static_cast<int16_t>(8 * sizeof(Int) - is_signed_v<Int>)) {
+ static_cast<int16_t>(8 * sizeof(Int) -
wide::is_signed_v<Int>)) {
return 1;
}
if (normalizedExponent() >
- static_cast<int16_t>(8 * sizeof(Int) - is_signed_v<Int>)) {
+ static_cast<int16_t>(8 * sizeof(Int) -
wide::is_signed_v<Int>)) {
Review Comment:
warning: 8 is a magic number; consider replacing it with a named constant
[readability-magic-numbers]
```cpp
static_cast<int16_t>(8 * sizeof(Int) -
wide::is_signed_v<Int>)) {
^
```
##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -19,11 +19,16 @@
#include <sqltypes.h>
Review Comment:
warning: 'sqltypes.h' file not found [clang-diagnostic-error]
```cpp
#include <sqltypes.h>
^
```
##########
be/src/service/internal_service.cpp:
##########
@@ -355,7 +354,7 @@ void
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
}
}
-void PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController*
controller,
+void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController*
controller,
Review Comment:
warning: method 'open_load_stream' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController*
controller,
```
##########
be/src/vec/data_types/data_type_ipv6.h:
##########
@@ -72,7 +72,9 @@ class DataTypeIPv6 final : public DataTypeNumberBase<IPv6> {
MutableColumnPtr create_column() const override;
- DataTypeSerDeSPtr get_serde() const override { return
std::make_shared<DataTypeIPv6SerDe>(); }
+ DataTypeSerDeSPtr get_serde(int nesting_level = 1) const override {
Review Comment:
warning: method 'get_serde' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static DataTypeSerDeSPtr get_serde(int nesting_level = 1) override {
```
##########
be/src/vec/core/decomposed_float.h:
##########
@@ -149,7 +150,8 @@
}
/// Larger octave: abs(rhs) > abs(float)
- if (normalizedExponent() + 1 < static_cast<int16_t>(8 * sizeof(Int) -
is_signed_v<Int>) &&
+ if (normalizedExponent() + 1 <
+ static_cast<int16_t>(8 * sizeof(Int) -
wide::is_signed_v<Int>) &&
Review Comment:
warning: 8 is a magic number; consider replacing it with a named constant
[readability-magic-numbers]
```cpp
static_cast<int16_t>(8 * sizeof(Int) -
wide::is_signed_v<Int>) &&
^
```
##########
be/src/vec/data_types/data_type_time.h:
##########
@@ -69,7 +69,9 @@ class DataTypeTime final : public DataTypeNumberBase<Float64>
{
bool can_be_used_in_boolean_context() const override { return true; }
bool can_be_inside_nullable() const override { return true; }
- DataTypeSerDeSPtr get_serde() const override { return
std::make_shared<DataTypeTimeSerDe>(); };
+ DataTypeSerDeSPtr get_serde(int nesting_level = 1) const override {
Review Comment:
warning: method 'get_serde' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static DataTypeSerDeSPtr get_serde(int nesting_level = 1) override {
```
##########
be/src/vec/data_types/data_type_time_v2.h:
##########
@@ -69,7 +69,9 @@ class DataTypeDateV2 final : public
DataTypeNumberBase<UInt32> {
bool can_be_inside_nullable() const override { return true; }
- DataTypeSerDeSPtr get_serde() const override { return
std::make_shared<DataTypeDateV2SerDe>(); }
+ DataTypeSerDeSPtr get_serde(int nesting_level = 1) const override {
Review Comment:
warning: method 'get_serde' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static DataTypeSerDeSPtr get_serde(int nesting_level = 1) override {
```
##########
be/src/vec/data_types/data_type_date.h:
##########
@@ -77,7 +77,9 @@ class DataTypeDate final : public DataTypeNumberBase<Int64> {
MutableColumnPtr create_column() const override;
- DataTypeSerDeSPtr get_serde() const override { return
std::make_shared<DataTypeDate64SerDe>(); }
+ DataTypeSerDeSPtr get_serde(int nesting_level = 1) const override {
Review Comment:
warning: method 'get_serde' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static DataTypeSerDeSPtr get_serde(int nesting_level = 1) override {
```
##########
be/src/vec/data_types/serde/data_type_date64_serde.cpp:
##########
@@ -93,13 +89,14 @@
return Status::OK();
}
-Status DataTypeDateTimeSerDe::serialize_column_to_json(
- const IColumn& column, int start_idx, int end_idx, BufferWritable& bw,
- FormatOptions& options, int nesting_level) const
{SERIALIZE_COLUMN_TO_JSON()}
+Status DataTypeDateTimeSerDe::serialize_column_to_json(const IColumn& column,
int start_idx,
Review Comment:
warning: method 'serialize_column_to_json' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status DataTypeDateTimeSerDe::serialize_column_to_json(const IColumn&
column, int start_idx,
```
be/src/vec/data_types/serde/data_type_date64_serde.cpp:93:
```diff
- FormatOptions&
options) const {
+ FormatOptions&
options) {
```
##########
be/src/vec/data_types/serde/data_type_date64_serde.cpp:
##########
@@ -62,18 +61,15 @@ Status
DataTypeDate64SerDe::serialize_one_cell_to_json(const IColumn& column, in
return Status::OK();
}
-Status DataTypeDate64SerDe::deserialize_column_from_json_vector(IColumn&
column,
-
std::vector<Slice>& slices,
- int*
num_deserialized,
- const
FormatOptions& options,
- int
nesting_level) const {
+Status DataTypeDate64SerDe::deserialize_column_from_json_vector(
Review Comment:
warning: method 'deserialize_column_from_json_vector' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status DataTypeDate64SerDe::deserialize_column_from_json_vector(
```
be/src/vec/data_types/serde/data_type_date64_serde.cpp:65:
```diff
- const FormatOptions& options) const {
+ const FormatOptions& options) {
```
##########
be/src/vec/data_types/serde/data_type_array_serde.cpp:
##########
@@ -326,7 +324,8 @@ Status DataTypeArraySerDe::write_column_to_mysql(const
IColumn& column,
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
}
-Status DataTypeArraySerDe::write_column_to_orc(const IColumn& column, const
NullMap* null_map,
+Status DataTypeArraySerDe::write_column_to_orc(const std::string& timezone,
const IColumn& column,
Review Comment:
warning: method 'write_column_to_orc' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status DataTypeArraySerDe::write_column_to_orc(const std::string&
timezone, const IColumn& column,
```
be/src/vec/data_types/serde/data_type_array_serde.cpp:329:
```diff
- int end,
std::vector<StringRef>& buffer_list) const {
+ int end,
std::vector<StringRef>& buffer_list) {
```
##########
be/src/vec/data_types/serde/data_type_bitmap_serde.cpp:
##########
@@ -139,7 +136,8 @@ Status DataTypeBitMapSerDe::write_column_to_mysql(const
IColumn& column,
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
}
-Status DataTypeBitMapSerDe::write_column_to_orc(const IColumn& column, const
NullMap* null_map,
+Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone,
const IColumn& column,
Review Comment:
warning: method 'write_column_to_orc' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status DataTypeBitMapSerDe::write_column_to_orc(const std::string&
timezone, const IColumn& column,
```
be/src/vec/data_types/serde/data_type_bitmap_serde.cpp:142:
```diff
- std::vector<StringRef>&
buffer_list) const {
+ std::vector<StringRef>&
buffer_list) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]