This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fe3b56ca7c4 [Refactor](exec) Remove unless code and add comment 
(#46503)
fe3b56ca7c4 is described below

commit fe3b56ca7c43fa3fb6a4439aa99a992ea561fc69
Author: HappenLee <[email protected]>
AuthorDate: Tue Jan 14 13:44:37 2025 +0800

    [Refactor](exec) Remove unless code and add comment (#46503)
    
    Remove unless code and add comment be/src/pipeline/pipeline_task.h/
    be/src/vec/runtime/vdatetime_value.h
---
 be/src/pipeline/pipeline.h                         |  1 -
 be/src/pipeline/pipeline_task.h                    | 20 +++-----
 be/src/pipeline/task_queue.cpp                     | 14 +++---
 be/src/pipeline/task_scheduler.cpp                 | 22 ++++-----
 be/src/vec/runtime/vdatetime_value.cpp             | 54 ----------------------
 be/src/vec/runtime/vdatetime_value.h               |  5 --
 be/test/vec/core/block_test.cpp                    |  2 +-
 .../serde/data_type_serde_mysql_test.cpp           |  2 +-
 .../data_types/serde/data_type_serde_pb_test.cpp   |  7 +--
 be/test/vec/jsonb/serialize_test.cpp               |  2 +-
 be/test/vec/runtime/vdatetime_value_test.cpp       | 16 +++----
 11 files changed, 33 insertions(+), 112 deletions(-)

diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index afbe6c77596..7bde9323e94 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -148,7 +148,6 @@ private:
     std::vector<std::shared_ptr<Pipeline>> _children;
 
     PipelineId _pipeline_id;
-    int _previous_schedule_id = -1;
 
     // pipline id + operator names. init when:
     //  build_operators(), if pipeline;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 6143feb5a81..ed1848f0352 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -67,17 +67,14 @@ public:
 
     QueryContext* query_context();
 
-    int get_previous_core_id() const {
-        return _previous_schedule_id != -1 ? _previous_schedule_id
-                                           : _pipeline->_previous_schedule_id;
-    }
+    int get_core_id() const { return _core_id; }
 
-    void set_previous_core_id(int id) {
-        if (id != _previous_schedule_id) {
-            if (_previous_schedule_id != -1) {
+    void set_core_id(int id) {
+        if (id != _core_id) {
+            if (_core_id != -1) {
                 COUNTER_UPDATE(_core_change_times, 1);
             }
-            _previous_schedule_id = id;
+            _core_id = id;
         }
     }
 
@@ -175,10 +172,6 @@ public:
     void update_queue_level(int queue_level) { this->_queue_level = 
queue_level; }
     int get_queue_level() const { return this->_queue_level; }
 
-    // 1.3 priority queue's core id
-    void set_core_id(int core_id) { this->_core_id = core_id; }
-    int get_core_id() const { return this->_core_id; }
-
     /**
      * Return true if:
      * 1. `enable_force_spill` is true which forces this task to spill data.
@@ -254,7 +247,7 @@ private:
     bool _has_exceed_timeout = false;
     bool _opened;
     RuntimeState* _state = nullptr;
-    int _previous_schedule_id = -1;
+    int _core_id = -1;
     uint32_t _schedule_time = 0;
     std::unique_ptr<doris::vectorized::Block> _block;
     PipelineFragmentContext* _fragment_context = nullptr;
@@ -269,7 +262,6 @@ private:
     // 2 exe task
     // 3 update task statistics(update _queue_level/_core_id)
     int _queue_level = 0;
-    int _core_id = 0;
 
     RuntimeProfile* _parent_profile = nullptr;
     std::unique_ptr<RuntimeProfile> _task_profile;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index ea812ca9b12..390707766a4 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -153,7 +153,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
                 << " _core_size: " << _core_size << " _next_core: " << 
_next_core.load();
         task = _prio_task_queues[core_id].try_take(false);
         if (task) {
-            task->set_core_id(core_id);
             break;
         }
         task = _steal_take(core_id);
@@ -162,7 +161,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
         }
         task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* 
timeout_ms */);
         if (task) {
-            task->set_core_id(core_id);
             break;
         }
     }
@@ -183,7 +181,6 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
         DCHECK(next_id < _core_size);
         auto task = _prio_task_queues[next_id].try_take(true);
         if (task) {
-            task->set_core_id(next_id);
             return task;
         }
     }
@@ -191,7 +188,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
 }
 
 Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
-    int core_id = task->get_previous_core_id();
+    int core_id = task->get_core_id();
     if (core_id < 0) {
         core_id = _next_core.fetch_add(1) % _core_size;
     }
@@ -205,9 +202,12 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, 
int core_id) {
 }
 
 void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t 
time_spent) {
-    task->inc_runtime_ns(time_spent);
-    
_prio_task_queues[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
-                                                                 time_spent);
+    // if the task not execute but exception early close, core_id == -1
+    // should not do update_statistics
+    if (auto core_id = task->get_core_id(); core_id >= 0) {
+        task->inc_runtime_ns(time_spent);
+        
_prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), 
time_spent);
+    }
 }
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 45898e76417..60d9efa66ad 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -33,7 +33,6 @@
 
 #include "common/logging.h"
 #include "pipeline/pipeline_task.h"
-#include "pipeline/task_queue.h"
 #include "pipeline_fragment_context.h"
 #include "runtime/exec_env.h"
 #include "runtime/query_context.h"
@@ -103,6 +102,9 @@ void TaskScheduler::_do_work(int index) {
         if (!task) {
             continue;
         }
+        // The task is already running, maybe block in now dependency wake up 
by other thread
+        // but the block thread still hold the task, so put it back to the 
queue, until the hold
+        // thread set task->set_running(false)
         if (task->is_running()) {
             static_cast<void>(_task_queue.push_back(task, index));
             continue;
@@ -129,12 +131,8 @@ void TaskScheduler::_do_work(int index) {
         // task exec
         bool eos = false;
         auto status = Status::OK();
+        task->set_core_id(index);
 
-#ifdef __APPLE__
-        uint32_t core_id = 0;
-#else
-        uint32_t core_id = sched_getcpu();
-#endif
         ASSIGN_STATUS_IF_CATCH_EXCEPTION(
                 //TODO: use a better enclose to abstracting these
                 if 
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
@@ -149,12 +147,11 @@ void TaskScheduler::_do_work(int index) {
 
                     uint64_t end_time = MonotonicMicros();
                     ExecEnv::GetInstance()->pipeline_tracer_context()->record(
-                            {query_id, task_name, core_id, thread_id, 
start_time, end_time});
+                            {query_id, task_name, 
static_cast<uint32_t>(index), thread_id,
+                             start_time, end_time});
                 } else { status = task->execute(&eos); },
                 status);
 
-        task->set_previous_core_id(index);
-
         if (!status.ok()) {
             // Print detail informations below when you debugging here.
             //
@@ -173,14 +170,11 @@ void TaskScheduler::_do_work(int index) {
         if (eos) {
             // is pending finish will add the task to dependency's blocking 
queue, and then the task will be
             // added to running queue when dependency is ready.
-            if (task->is_pending_finish()) {
-                // Only meet eos, should set task to PENDING_FINISH state
-                task->set_running(false);
-            } else {
+            if (!task->is_pending_finish()) {
                 Status exec_status = 
fragment_ctx->get_query_ctx()->exec_status();
                 _close_task(task, exec_status);
+                continue;
             }
-            continue;
         }
 
         task->set_running(false);
diff --git a/be/src/vec/runtime/vdatetime_value.cpp 
b/be/src/vec/runtime/vdatetime_value.cpp
index 8e053c93447..07508ede5c0 100644
--- a/be/src/vec/runtime/vdatetime_value.cpp
+++ b/be/src/vec/runtime/vdatetime_value.cpp
@@ -2823,40 +2823,6 @@ int date_day_offset_dict::daynr(int year, int month, int 
day) const {
     return DATE_DAY_OFFSET_DICT[year - START_YEAR][month - 1][day - 1];
 }
 
-template <typename T>
-uint32_t DateV2Value<T>::set_date_uint32(uint32_t int_val) {
-    union DateV2UInt32Union {
-        DateV2Value<T> dt;
-        uint32_t ui32;
-        ~DateV2UInt32Union() {}
-    };
-    DateV2UInt32Union conv = {.ui32 = int_val};
-    if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 
0)) {
-        return 0;
-    }
-    this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), 
0, 0, 0, 0);
-
-    return int_val;
-}
-
-template <typename T>
-uint64_t DateV2Value<T>::set_datetime_uint64(uint64_t int_val) {
-    union DateTimeV2UInt64Union {
-        DateV2Value<T> dt;
-        uint64_t ui64;
-        ~DateTimeV2UInt64Union() {}
-    };
-    DateTimeV2UInt64Union conv = {.ui64 = int_val};
-    if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), 
conv.dt.hour(), conv.dt.minute(),
-                   conv.dt.second(), conv.dt.microsecond())) {
-        return 0;
-    }
-    this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), 
conv.dt.hour(),
-                             conv.dt.minute(), conv.dt.second(), 
conv.dt.microsecond());
-
-    return int_val;
-}
-
 template <typename T>
 uint8_t DateV2Value<T>::week(uint8_t mode) const {
     uint16_t year = 0;
@@ -3685,26 +3651,6 @@ bool DateV2Value<T>::to_format_string_conservative(const 
char* format, size_t le
     return true;
 }
 
-template <typename T>
-bool DateV2Value<T>::from_date(uint32_t value) {
-    DCHECK(!is_datetime);
-    if (value < MIN_DATE_V2 || value > MAX_DATE_V2) {
-        return false;
-    }
-
-    return set_date_uint32(value);
-}
-
-template <typename T>
-bool DateV2Value<T>::from_datetime(uint64_t value) {
-    DCHECK(is_datetime);
-    if (value < MIN_DATETIME_V2 || value > MAX_DATETIME_V2) {
-        return false;
-    }
-
-    return set_datetime_uint64(value);
-}
-
 template <typename T>
 int64_t DateV2Value<T>::standardize_timevalue(int64_t value) {
     if (value <= 0) {
diff --git a/be/src/vec/runtime/vdatetime_value.h 
b/be/src/vec/runtime/vdatetime_value.h
index f724f918828..3f6980e8922 100644
--- a/be/src/vec/runtime/vdatetime_value.h
+++ b/be/src/vec/runtime/vdatetime_value.h
@@ -1181,12 +1181,7 @@ public:
 
     underlying_value to_date_int_val() const { return int_val_; }
 
-    bool from_date(uint32_t value);
-    bool from_datetime(uint64_t value);
-
     bool from_date_int64(int64_t value);
-    uint32_t set_date_uint32(uint32_t int_val);
-    uint64_t set_datetime_uint64(uint64_t int_val);
 
     bool get_date_from_daynr(uint64_t);
 
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 134264f3e80..7c876b00aa4 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -771,7 +771,7 @@ TEST(BlockTest, dump_data) {
     auto& date_v2_data = column_vector_date_v2->get_data();
     for (int i = 0; i < 1024; ++i) {
         DateV2Value<DateV2ValueType> value;
-        value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
+        value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
         date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
     }
     vectorized::DataTypePtr 
date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
index f05919e4a8f..3f37bf93c52 100644
--- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
@@ -204,7 +204,7 @@ void serialize_and_deserialize_mysql_test() {
                 auto& date_v2_data = column_vector_date_v2->get_data();
                 for (int i = 0; i < row_num; ++i) {
                     DateV2Value<DateV2ValueType> value;
-                    value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
+                    value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
                     
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
                 }
                 vectorized::DataTypePtr date_v2_type(
diff --git a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
index bf6ead9b21b..1414d6c78dc 100644
--- a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp
@@ -668,12 +668,9 @@ TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestDateTime) {
             uint8_t minute = i;
             uint8_t second = 0;
             uint32_t microsecond = 123000;
-            auto value = ((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month 
<< 42) |
-                                     ((uint64_t)day << 37) | ((uint64_t)hour 
<< 32) |
-                                     ((uint64_t)minute << 26) | 
((uint64_t)second << 20) |
-                                     (uint64_t)microsecond));
+
             DateV2Value<DateTimeV2ValueType> datetime_v2;
-            datetime_v2.from_datetime(value);
+            datetime_v2.unchecked_set_time(year, month, day, hour, minute, 
second, microsecond);
             auto datetime_val = binary_cast<DateV2Value<DateTimeV2ValueType>, 
UInt64>(datetime_v2);
             data.push_back(datetime_val);
         }
diff --git a/be/test/vec/jsonb/serialize_test.cpp 
b/be/test/vec/jsonb/serialize_test.cpp
index f3bfc4448fa..239d2a5f165 100644
--- a/be/test/vec/jsonb/serialize_test.cpp
+++ b/be/test/vec/jsonb/serialize_test.cpp
@@ -483,7 +483,7 @@ TEST(BlockSerializeTest, JsonbBlock) {
         auto& date_v2_data = column_vector_date_v2->get_data();
         for (int i = 0; i < 1024; ++i) {
             DateV2Value<DateV2ValueType> value;
-            value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6));
+            value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0);
             
date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
         }
         vectorized::DataTypePtr 
date_v2_type(std::make_shared<vectorized::DataTypeDateV2>());
diff --git a/be/test/vec/runtime/vdatetime_value_test.cpp 
b/be/test/vec/runtime/vdatetime_value_test.cpp
index 6c0bfad6b56..fd0b3a1d2e5 100644
--- a/be/test/vec/runtime/vdatetime_value_test.cpp
+++ b/be/test/vec/runtime/vdatetime_value_test.cpp
@@ -75,7 +75,7 @@ TEST(VDateTimeValueTest, date_v2_from_uint32_test) {
         uint8_t day = 24;
 
         DateV2Value<DateV2ValueType> date_v2;
-        date_v2.from_date((uint32_t)((year << 9) | (month << 5) | day));
+        date_v2.unchecked_set_time(year, month, day, 0, 0, 0, 0);
 
         EXPECT_TRUE(date_v2.year() == year);
         EXPECT_TRUE(date_v2.month() == month);
@@ -114,10 +114,7 @@ TEST(VDateTimeValueTest, datetime_v2_from_uint64_test) {
         uint32_t microsecond = 999999;
 
         DateV2Value<DateTimeV2ValueType> datetime_v2;
-        datetime_v2.from_datetime((uint64_t)(((uint64_t)year << 46) | 
((uint64_t)month << 42) |
-                                             ((uint64_t)day << 37) | 
((uint64_t)hour << 32) |
-                                             ((uint64_t)minute << 26) | 
((uint64_t)second << 20) |
-                                             (uint64_t)microsecond));
+        datetime_v2.unchecked_set_time(year, month, day, hour, minute, second, 
microsecond);
 
         EXPECT_TRUE(datetime_v2.year() == year);
         EXPECT_TRUE(datetime_v2.month() == month);
@@ -142,10 +139,11 @@ TEST(VDateTimeValueTest, datetime_v2_from_uint64_test) {
         uint32_t microsecond = 123000;
 
         DateV2Value<DateTimeV2ValueType> datetime_v2;
-        datetime_v2.from_datetime((uint64_t)(((uint64_t)year << 46) | 
((uint64_t)month << 42) |
-                                             ((uint64_t)day << 37) | 
((uint64_t)hour << 32) |
-                                             ((uint64_t)minute << 26) | 
((uint64_t)second << 20) |
-                                             (uint64_t)microsecond));
+        auto ui64 = (uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 
42) |
+                               ((uint64_t)day << 37) | ((uint64_t)hour << 32) |
+                               ((uint64_t)minute << 26) | ((uint64_t)second << 
20) |
+                               (uint64_t)microsecond);
+        datetime_v2 = (DateV2Value<DateTimeV2ValueType>&)ui64;
 
         EXPECT_TRUE(datetime_v2.year() == year);
         EXPECT_TRUE(datetime_v2.month() == month);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to