This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fe3b56ca7c4 [Refactor](exec) Remove unless code and add comment
(#46503)
fe3b56ca7c4 is described below
commit fe3b56ca7c43fa3fb6a4439aa99a992ea561fc69
Author: HappenLee <[email protected]>
AuthorDate: Tue Jan 14 13:44:37 2025 +0800
[Refactor](exec) Remove unless code and add comment (#46503)
Remove unless code and add comment be/src/pipeline/pipeline_task.h/
be/src/vec/runtime/vdatetime_value.h
---
be/src/pipeline/pipeline.h | 1 -
be/src/pipeline/pipeline_task.h | 20 +++-----
be/src/pipeline/task_queue.cpp | 14 +++---
be/src/pipeline/task_scheduler.cpp | 22 ++++-----
be/src/vec/runtime/vdatetime_value.cpp | 54 ----------------------
be/src/vec/runtime/vdatetime_value.h | 5 --
be/test/vec/core/block_test.cpp | 2 +-
.../serde/data_type_serde_mysql_test.cpp | 2 +-
.../data_types/serde/data_type_serde_pb_test.cpp | 7 +--
be/test/vec/jsonb/serialize_test.cpp | 2 +-
be/test/vec/runtime/vdatetime_value_test.cpp | 16 +++----
11 files changed, 33 insertions(+), 112 deletions(-)
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index afbe6c77596..7bde9323e94 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -148,7 +148,6 @@ private:
std::vector<std::shared_ptr<Pipeline>> _children;
PipelineId _pipeline_id;
- int _previous_schedule_id = -1;
// pipline id + operator names. init when:
// build_operators(), if pipeline;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 6143feb5a81..ed1848f0352 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -67,17 +67,14 @@ public:
QueryContext* query_context();
- int get_previous_core_id() const {
- return _previous_schedule_id != -1 ? _previous_schedule_id
- : _pipeline->_previous_schedule_id;
- }
+ int get_core_id() const { return _core_id; }
- void set_previous_core_id(int id) {
- if (id != _previous_schedule_id) {
- if (_previous_schedule_id != -1) {
+ void set_core_id(int id) {
+ if (id != _core_id) {
+ if (_core_id != -1) {
COUNTER_UPDATE(_core_change_times, 1);
}
- _previous_schedule_id = id;
+ _core_id = id;
}
}
@@ -175,10 +172,6 @@ public:
void update_queue_level(int queue_level) { this->_queue_level =
queue_level; }
int get_queue_level() const { return this->_queue_level; }
- // 1.3 priority queue's core id
- void set_core_id(int core_id) { this->_core_id = core_id; }
- int get_core_id() const { return this->_core_id; }
-
/**
* Return true if:
* 1. `enable_force_spill` is true which forces this task to spill data.
@@ -254,7 +247,7 @@ private:
bool _has_exceed_timeout = false;
bool _opened;
RuntimeState* _state = nullptr;
- int _previous_schedule_id = -1;
+ int _core_id = -1;
uint32_t _schedule_time = 0;
std::unique_ptr<doris::vectorized::Block> _block;
PipelineFragmentContext* _fragment_context = nullptr;
@@ -269,7 +262,6 @@ private:
// 2 exe task
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
- int _core_id = 0;
RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index ea812ca9b12..390707766a4 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -153,7 +153,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
<< " _core_size: " << _core_size << " _next_core: " <<
_next_core.load();
task = _prio_task_queues[core_id].try_take(false);
if (task) {
- task->set_core_id(core_id);
break;
}
task = _steal_take(core_id);
@@ -162,7 +161,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
}
task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /*
timeout_ms */);
if (task) {
- task->set_core_id(core_id);
break;
}
}
@@ -183,7 +181,6 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(next_id < _core_size);
auto task = _prio_task_queues[next_id].try_take(true);
if (task) {
- task->set_core_id(next_id);
return task;
}
}
@@ -191,7 +188,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
}
Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
- int core_id = task->get_previous_core_id();
+ int core_id = task->get_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
@@ -205,9 +202,12 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task,
int core_id) {
}
void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t
time_spent) {
- task->inc_runtime_ns(time_spent);
-
_prio_task_queues[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
- time_spent);
+ // if the task not execute but exception early close, core_id == -1
+ // should not do update_statistics
+ if (auto core_id = task->get_core_id(); core_id >= 0) {
+ task->inc_runtime_ns(time_spent);
+
_prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
+ }
}
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 45898e76417..60d9efa66ad 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -33,7 +33,6 @@
#include "common/logging.h"
#include "pipeline/pipeline_task.h"
-#include "pipeline/task_queue.h"
#include "pipeline_fragment_context.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
@@ -103,6 +102,9 @@ void TaskScheduler::_do_work(int index) {
if (!task) {
continue;
}
+ // The task is already running, maybe block in now dependency wake up
by other thread
+ // but the block thread still hold the task, so put it back to the
queue, until the hold
+ // thread set task->set_running(false)
if (task->is_running()) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
@@ -129,12 +131,8 @@ void TaskScheduler::_do_work(int index) {
// task exec
bool eos = false;
auto status = Status::OK();
+ task->set_core_id(index);
-#ifdef __APPLE__
- uint32_t core_id = 0;
-#else
- uint32_t core_id = sched_getcpu();
-#endif
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
@@ -149,12 +147,11 @@ void TaskScheduler::_do_work(int index) {
uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
- {query_id, task_name, core_id, thread_id,
start_time, end_time});
+ {query_id, task_name,
static_cast<uint32_t>(index), thread_id,
+ start_time, end_time});
} else { status = task->execute(&eos); },
status);
- task->set_previous_core_id(index);
-
if (!status.ok()) {
// Print detail informations below when you debugging here.
//
@@ -173,14 +170,11 @@ void TaskScheduler::_do_work(int index) {
if (eos) {
// is pending finish will add the task to dependency's blocking
queue, and then the task will be
// added to running queue when dependency is ready.
- if (task->is_pending_finish()) {
- // Only meet eos, should set task to PENDING_FINISH state
- task->set_running(false);
- } else {
+ if (!task->is_pending_finish()) {
Status exec_status =
fragment_ctx->get_query_ctx()->exec_status();
_close_task(task, exec_status);
+ continue;
}
- continue;
}
task->set_running(false);
diff --git a/be/src/vec/runtime/vdatetime_value.cpp
b/be/src/vec/runtime/vdatetime_value.cpp
index 8e053c93447..07508ede5c0 100644
--- a/be/src/vec/runtime/vdatetime_value.cpp
+++ b/be/src/vec/runtime/vdatetime_value.cpp
@@ -2823,40 +2823,6 @@ int date_day_offset_dict::daynr(int year, int month, int
day) const {
return DATE_DAY_OFFSET_DICT[year - START_YEAR][month - 1][day - 1];
}
-template <typename T>
-uint32_t DateV2Value<T>::set_date_uint32(uint32_t int_val) {
- union DateV2UInt32Union {
- DateV2Value<T> dt;
- uint32_t ui32;
- ~DateV2UInt32Union() {}
- };
- DateV2UInt32Union conv = {.ui32 = int_val};
- if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0,
0)) {
- return 0;
- }
- this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(),
0, 0, 0, 0);
-
- return int_val;
-}
-
-template <typename T>
-uint64_t DateV2Value<T>::set_datetime_uint64(uint64_t int_val) {
- union DateTimeV2UInt64Union {
- DateV2Value<T> dt;
- uint64_t ui64;
- ~DateTimeV2UInt64Union() {}
- };
- DateTimeV2UInt64Union conv = {.ui64 = int_val};
- if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(),
conv.dt.hour(), conv.dt.minute(),
- conv.dt.second(), conv.dt.microsecond())) {
- return 0;
- }
- this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(),
conv.dt.hour(),
- conv.dt.minute(), conv.dt.second(),
conv.dt.microsecond());
-
- return int_val;
-}
-
template <typename T>
uint8_t DateV2Value<T>::week(uint8_t mode) const {
uint16_t year = 0;
@@ -3685,26 +3651,6 @@ bool DateV2Value<T>::to_format_string_conservative(const
char* format, size_t le
return true;
}
-template <typename T>
-bool DateV2Value<T>::from_date(uint32_t value) {
- DCHECK(!is_datetime);
- if (value < MIN_DATE_V2 || value > MAX_DATE_V2) {
- return false;
- }
-
- return set_date_uint32(value);
-}
-
-template <typename T>
-bool DateV2Value<T>::from_datetime(uint64_t value) {
- DCHECK(is_datetime);
- if (value < MIN_DATETIME_V2 || value > MAX_DATETIME_V2) {
- return false;
- }
-
- return set_datetime_uint64(value);
-}
-
template <typename T>
int64_t DateV2Value<T>::standardize_timevalue(int64_t value) {
if (value <= 0) {
diff --git a/be/src/vec/runtime/vdatetime_value.h
b/be/src/vec/runtime/vdatetime_value.h
index f724f918828..3f6980e8922 100644
--- a/be/src/vec/runtime/vdatetime_value.h
+++ b/be/src/vec/runtime/vdatetime_value.h
@@ -1181,12 +1181,7 @@ public:
underlying_value to_date_int_val() const { return int_val_; }
- bool from_date(uint32_t value);
- bool from_datetime(uint64_t value);
-
bool from_date_int64(int64_t value);
- uint32_t set_date_uint32(uint32_t int_val);
- uint64_t set_datetime_uint64(uint64_t int_val);
bool get_date_from_daynr(uint64_t);
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 134264f3e80..7c876b00aa4 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -771,7 +771,7 @@ TEST(BlockTest, dump_data) {
auto& date_v2_data = column_vector_date_v2->get_data();
for (int i = 0; i < 1024; ++i) {
DateV2Value<DateV2ValueType> value;
- value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
+ value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
}
vectorized::DataTypePtr
date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
index f05919e4a8f..3f37bf93c52 100644
--- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
@@ -204,7 +204,7 @@ void serialize_and_deserialize_mysql_test() {
auto& date_v2_data = column_vector_date_v2->get_data();
for (int i = 0; i < row_num; ++i) {
DateV2Value<DateV2ValueType> value;
- value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
+ value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
}
vectorized::DataTypePtr date_v2_type(
diff --git a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
index bf6ead9b21b..1414d6c78dc 100644
--- a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
@@ -668,12 +668,9 @@ TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestDateTime) {
uint8_t minute = i;
uint8_t second = 0;
uint32_t microsecond = 123000;
- auto value = ((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month
<< 42) |
- ((uint64_t)day << 37) | ((uint64_t)hour
<< 32) |
- ((uint64_t)minute << 26) |
((uint64_t)second << 20) |
- (uint64_t)microsecond));
+
DateV2Value<DateTimeV2ValueType> datetime_v2;
- datetime_v2.from_datetime(value);
+ datetime_v2.unchecked_set_time(year, month, day, hour, minute,
second, microsecond);
auto datetime_val = binary_cast<DateV2Value<DateTimeV2ValueType>,
UInt64>(datetime_v2);
data.push_back(datetime_val);
}
diff --git a/be/test/vec/jsonb/serialize_test.cpp
b/be/test/vec/jsonb/serialize_test.cpp
index f3bfc4448fa..239d2a5f165 100644
--- a/be/test/vec/jsonb/serialize_test.cpp
+++ b/be/test/vec/jsonb/serialize_test.cpp
@@ -483,7 +483,7 @@ TEST(BlockSerializeTest, JsonbBlock) {
auto& date_v2_data = column_vector_date_v2->get_data();
for (int i = 0; i < 1024; ++i) {
DateV2Value<DateV2ValueType> value;
- value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
+ value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
}
vectorized::DataTypePtr
date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
diff --git a/be/test/vec/runtime/vdatetime_value_test.cpp
b/be/test/vec/runtime/vdatetime_value_test.cpp
index 6c0bfad6b56..fd0b3a1d2e5 100644
--- a/be/test/vec/runtime/vdatetime_value_test.cpp
+++ b/be/test/vec/runtime/vdatetime_value_test.cpp
@@ -75,7 +75,7 @@ TEST(VDateTimeValueTest, date_v2_from_uint32_test) {
uint8_t day = 24;
DateV2Value<DateV2ValueType> date_v2;
- date_v2.from_date((uint32_t)((year << 9) | (month << 5) | day));
+ date_v2.unchecked_set_time(year, month, day, 0, 0, 0, 0);
EXPECT_TRUE(date_v2.year() == year);
EXPECT_TRUE(date_v2.month() == month);
@@ -114,10 +114,7 @@ TEST(VDateTimeValueTest, datetime_v2_from_uint64_test) {
uint32_t microsecond = 999999;
DateV2Value<DateTimeV2ValueType> datetime_v2;
- datetime_v2.from_datetime((uint64_t)(((uint64_t)year << 46) |
((uint64_t)month << 42) |
- ((uint64_t)day << 37) |
((uint64_t)hour << 32) |
- ((uint64_t)minute << 26) |
((uint64_t)second << 20) |
- (uint64_t)microsecond));
+ datetime_v2.unchecked_set_time(year, month, day, hour, minute, second,
microsecond);
EXPECT_TRUE(datetime_v2.year() == year);
EXPECT_TRUE(datetime_v2.month() == month);
@@ -142,10 +139,11 @@ TEST(VDateTimeValueTest, datetime_v2_from_uint64_test) {
uint32_t microsecond = 123000;
DateV2Value<DateTimeV2ValueType> datetime_v2;
- datetime_v2.from_datetime((uint64_t)(((uint64_t)year << 46) |
((uint64_t)month << 42) |
- ((uint64_t)day << 37) |
((uint64_t)hour << 32) |
- ((uint64_t)minute << 26) |
((uint64_t)second << 20) |
- (uint64_t)microsecond));
+ auto ui64 = (uint64_t)(((uint64_t)year << 46) | ((uint64_t)month <<
42) |
+ ((uint64_t)day << 37) | ((uint64_t)hour << 32) |
+ ((uint64_t)minute << 26) | ((uint64_t)second <<
20) |
+ (uint64_t)microsecond);
+ datetime_v2 = (DateV2Value<DateTimeV2ValueType>&)ui64;
EXPECT_TRUE(datetime_v2.year() == year);
EXPECT_TRUE(datetime_v2.month() == month);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]