This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 246da2de99b branch-3.0: [enhance](metrics)add metrics to show 
compaction task num #50706 (#50883)
246da2de99b is described below

commit 246da2de99b05dd23cf2c8cde1512da363ac0531
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 14 20:59:42 2025 +0800

    branch-3.0: [enhance](metrics)add metrics to show compaction task num 
#50706 (#50883)
    
    Cherry-picked from #50706
    
    Co-authored-by: koarz <[email protected]>
---
 be/src/cloud/cloud_storage_engine.cpp    |  16 ++++
 be/src/olap/olap_server.cpp              |  25 ++++++
 be/src/util/doris_metrics.cpp            |  16 ++++
 be/src/util/doris_metrics.h              |   5 ++
 be/test/olap/compaction_metrics_test.cpp | 141 +++++++++++++++++++++++++++++++
 5 files changed, 203 insertions(+)

diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 929f5480538..01d9205a33a 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -681,11 +681,17 @@ Status 
CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
         _submitted_base_compactions[tablet->tablet_id()] = compaction;
     }
     st = _base_compaction_thread_pool->submit_func([=, this, compaction = 
std::move(compaction)]() {
+        
DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
+        
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+                _base_compaction_thread_pool->get_queue_size());
         g_base_compaction_running_task_count << 1;
         signal::tablet_id = tablet->tablet_id();
         Defer defer {[&]() {
             g_base_compaction_running_task_count << -1;
             _submitted_base_compactions.erase(tablet->tablet_id());
+            
DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
+            
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+                    _base_compaction_thread_pool->get_queue_size());
         }};
         auto st = 
_request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, 
tablet,
                                                          compaction);
@@ -699,6 +705,8 @@ Status 
CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
         std::lock_guard lock(_compaction_mtx);
         _executing_base_compactions.erase(tablet->tablet_id());
     });
+    DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+            _base_compaction_thread_pool->get_queue_size());
     if (!st.ok()) {
         std::lock_guard lock(_compaction_mtx);
         _submitted_base_compactions.erase(tablet->tablet_id());
@@ -777,6 +785,9 @@ Status 
CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
         }
     };
     st = _cumu_compaction_thread_pool->submit_func([=, this, compaction = 
std::move(compaction)]() {
+        
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
+        
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+                _cumu_compaction_thread_pool->get_queue_size());
         
DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line",
                         { sleep(5); })
         signal::tablet_id = tablet->tablet_id();
@@ -792,6 +803,9 @@ Status 
CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
             }
             g_cumu_compaction_running_task_count << -1;
             erase_submitted_cumu_compaction();
+            
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1);
+            
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+                    _cumu_compaction_thread_pool->get_queue_size());
         }};
         auto st = 
_request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION,
                                                          tablet, compaction);
@@ -845,6 +859,8 @@ Status 
CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
         }
         erase_executing_cumu_compaction();
     });
+    
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+            _cumu_compaction_thread_pool->get_queue_size());
     if (!st.ok()) {
         erase_submitted_cumu_compaction();
         return Status::InternalError("failed to submit cumu compaction, 
tablet_id={}",
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 41cc66f0c13..6410fd421f1 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -82,6 +82,7 @@
 #include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "util/mem_info.h"
+#include "util/metrics.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
 #include "util/thrift_rpc_helper.h"
@@ -1049,6 +1050,15 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                       << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
         auto st = thread_pool->submit_func([tablet, compaction = 
std::move(compaction),
                                             compaction_type, permits, force, 
this]() {
+            if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) 
[[likely]] {
+                
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
+                
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+                        _cumu_compaction_thread_pool->get_queue_size());
+            } else if (compaction_type == CompactionType::BASE_COMPACTION) {
+                
DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
+                
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+                        _base_compaction_thread_pool->get_queue_size());
+            }
             bool is_large_task = true;
             Defer defer {[&]() {
                 DBUG_EXECUTE_IF("StorageEngine._submit_compaction_task.sleep", 
{ sleep(5); })
@@ -1063,6 +1073,14 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                     if (!is_large_task) {
                         _cumu_compaction_thread_pool_small_tasks_running--;
                     }
+                    
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(
+                            -1);
+                    
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+                            _cumu_compaction_thread_pool->get_queue_size());
+                } else if (compaction_type == CompactionType::BASE_COMPACTION) 
{
+                    
DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
+                    
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+                            _base_compaction_thread_pool->get_queue_size());
                 }
             }};
             do {
@@ -1121,6 +1139,13 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
             
TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
             tablet->execute_compaction(*compaction);
         });
+        if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) 
[[likely]] {
+            
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+                    _cumu_compaction_thread_pool->get_queue_size());
+        } else if (compaction_type == CompactionType::BASE_COMPACTION) {
+            
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+                    _base_compaction_thread_pool->get_queue_size());
+        }
         if (!st.ok()) {
             if (!force) {
                 _permit_limiter.release(permits);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 48b88cd5727..362efa86809 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -99,6 +99,17 @@ 
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cumulative_compaction_bytes_total, MetricUn
 DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(full_compaction_bytes_total, 
MetricUnit::BYTES, "",
                                      compaction_bytes_total, Labels({{"type", 
"full"}}));
 
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(base_compaction_task_running_total, 
MetricUnit::ROWSETS, "",
+                                     compaction_task_state_total, 
Labels({{"type", "base"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(base_compaction_task_pending_total, 
MetricUnit::ROWSETS, "",
+                                     compaction_task_state_total, 
Labels({{"type", "base"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cumulative_compaction_task_running_total, 
MetricUnit::ROWSETS,
+                                     "", compaction_task_state_total,
+                                     Labels({{"type", "cumulative"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cumulative_compaction_task_pending_total, 
MetricUnit::ROWSETS,
+                                     "", compaction_task_state_total,
+                                     Labels({{"type", "cumulative"}}));
+
 DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(segment_read_total, 
MetricUnit::OPERATIONS,
                                      "(segment_v2) total number of segments 
read", segment_read,
                                      Labels({{"type", "segment_read_total"}}));
@@ -251,6 +262,11 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
full_compaction_deltas_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
full_compaction_bytes_total);
 
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
base_compaction_task_running_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
base_compaction_task_pending_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
cumulative_compaction_task_running_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
cumulative_compaction_task_pending_total);
+
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_read_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_row_total);
 
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 6fbc24d6922..0d9c060bfb8 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -95,6 +95,11 @@ public:
     IntCounter* full_compaction_deltas_total = nullptr;
     IntCounter* full_compaction_bytes_total = nullptr;
 
+    IntCounter* base_compaction_task_running_total = nullptr;
+    IntCounter* base_compaction_task_pending_total = nullptr;
+    IntCounter* cumulative_compaction_task_running_total = nullptr;
+    IntCounter* cumulative_compaction_task_pending_total = nullptr;
+
     IntCounter* publish_task_request_total = nullptr;
     IntCounter* publish_task_failed_total = nullptr;
 
diff --git a/be/test/olap/compaction_metrics_test.cpp 
b/be/test/olap/compaction_metrics_test.cpp
new file mode 100644
index 00000000000..a556384ecc9
--- /dev/null
+++ b/be/test/olap/compaction_metrics_test.cpp
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <filesystem>
+#include <memory>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "cpp/sync_point.h"
+#include "gtest/gtest_pred_impl.h"
+#include "io/fs/local_file_system.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/data_dir.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "util/doris_metrics.h"
+#include "util/threadpool.h"
+
+namespace doris {
+using namespace config;
+
+class CompactionMetricsTest : public testing::Test {
+public:
+    void SetUp() override {
+        _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp";
+        auto st = 
io::global_local_filesystem()->delete_directory(_engine_data_path);
+        ASSERT_TRUE(st.ok()) << st;
+        st = 
io::global_local_filesystem()->create_directory(_engine_data_path);
+        ASSERT_TRUE(st.ok()) << st;
+        EXPECT_TRUE(
+                
io::global_local_filesystem()->create_directory(_engine_data_path + 
"/meta").ok());
+
+        EngineOptions options;
+        options.backend_uid = UniqueId::gen_uid();
+        _storage_engine = std::make_unique<StorageEngine>(options);
+        _data_dir = std::make_unique<DataDir>(*_storage_engine, 
_engine_data_path, 100000000);
+        static_cast<void>(_data_dir->init());
+    }
+
+    void TearDown() override {
+        
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+        ExecEnv::GetInstance()->set_storage_engine(nullptr);
+    }
+
+    std::unique_ptr<StorageEngine> _storage_engine;
+    std::string _engine_data_path;
+    std::unique_ptr<DataDir> _data_dir;
+};
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool 
overlapping,
+                                     int data_size) {
+    auto rs_meta = std::make_shared<RowsetMeta>();
+    rs_meta->set_rowset_type(BETA_ROWSET); // important
+    rs_meta->_rowset_meta_pb.set_start_version(version.first);
+    rs_meta->_rowset_meta_pb.set_end_version(version.second);
+    rs_meta->set_num_segments(num_segments);
+    rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+    rs_meta->set_total_disk_size(data_size);
+    RowsetSharedPtr rowset;
+    Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
+    if (!st.ok()) {
+        return nullptr;
+    }
+    return rowset;
+}
+
+TEST_F(CompactionMetricsTest, TestCompactionTaskNumWithDiffStatus) {
+    auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                      .set_min_threads(2)
+                      .set_max_threads(2)
+                      .build(&_storage_engine->_base_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+    st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                 .set_min_threads(2)
+                 .set_max_threads(2)
+                 .build(&_storage_engine->_cumu_compaction_thread_pool);
+    EXPECT_TRUE(st.ok());
+
+    auto* sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    sp->set_call_back("olap_server::execute_compaction", [](auto&& values) {
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        bool* pred = try_any_cast<bool*>(values.back());
+        *pred = true;
+    });
+
+    for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) {
+        TabletMetaSharedPtr tablet_meta;
+        tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
TTabletSchema(), 6, {{7, 8}},
+                                         UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                         TCompressionType::LZ4F));
+        TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), 
tablet_meta, _data_dir.get(),
+                                          CUMULATIVE_SIZE_BASED_POLICY));
+        st = tablet->init();
+        EXPECT_TRUE(st.ok());
+
+        for (int i = 2; i < 30; ++i) {
+            RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+            tablet->_rs_version_map.emplace(rs->version(), rs);
+        }
+        tablet->_cumulative_point = 2;
+
+        st = _storage_engine->_submit_compaction_task(tablet, 
CompactionType::CUMULATIVE_COMPACTION,
+                                                      false);
+        EXPECT_TRUE(st.ok());
+        std::this_thread::sleep_for(std::chrono::milliseconds(150));
+        
EXPECT_EQ(_storage_engine->_cumu_compaction_thread_pool->num_active_threads(),
+                  
DorisMetrics::instance()->cumulative_compaction_task_running_total->value());
+        
EXPECT_EQ(_storage_engine->_cumu_compaction_thread_pool->get_queue_size(),
+                  
DorisMetrics::instance()->cumulative_compaction_task_pending_total->value());
+        
EXPECT_EQ(_storage_engine->_base_compaction_thread_pool->num_active_threads(),
+                  
DorisMetrics::instance()->base_compaction_task_running_total->value());
+        
EXPECT_EQ(_storage_engine->_base_compaction_thread_pool->get_queue_size(),
+                  
DorisMetrics::instance()->base_compaction_task_pending_total->value());
+    }
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to