This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 06d751451a7 branch-3.0: [enhance](metrics)add some compaction metrics
#50910 (#51487)
06d751451a7 is described below
commit 06d751451a75b08f4cd3ca059e105de425e8524d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 11 11:00:29 2025 +0800
branch-3.0: [enhance](metrics)add some compaction metrics #50910 (#51487)
Cherry-picked from #50910
Co-authored-by: koarz <[email protected]>
---
be/src/cloud/cloud_base_compaction.cpp | 4 +-
be/src/cloud/cloud_cumulative_compaction.cpp | 4 +-
be/src/cloud/cloud_full_compaction.cpp | 4 +-
be/src/cloud/cloud_storage_engine.cpp | 6 +-
be/src/olap/compaction.cpp | 41 +++++++++-
be/src/olap/compaction.h | 3 +
be/src/olap/cumulative_compaction.cpp | 5 ++
be/src/olap/merger.cpp | 14 ++++
be/src/olap/merger.h | 4 +
be/src/olap/olap_server.cpp | 12 +++
be/src/olap/rowset/beta_rowset_reader.h | 2 +
be/src/util/doris_metrics.cpp | 31 ++++++++
be/src/util/doris_metrics.h | 11 +++
be/test/olap/compaction_metrics_test.cpp | 114 ++++++++++++++++++++++++++-
14 files changed, 246 insertions(+), 9 deletions(-)
diff --git a/be/src/cloud/cloud_base_compaction.cpp
b/be/src/cloud/cloud_base_compaction.cpp
index 7cd97d42fcf..2b9fd1c2e56 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -300,7 +300,9 @@ Status CloudBaseCompaction::execute_compact() {
.tag("output_segments", _output_rowset->num_segments())
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
- .tag("output_rowset_total_size",
_output_rowset->total_disk_size());
+ .tag("output_rowset_total_size", _output_rowset->total_disk_size())
+ .tag("local_read_bytes", _local_read_bytes_total)
+ .tag("remote_read_bytes", _remote_read_bytes_total);
//_compaction_succeed = true;
_state = CompactionState::SUCCESS;
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index ae66ddb9433..097bdc9bf4a 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -223,7 +223,9 @@ Status CloudCumulativeCompaction::execute_compact() {
.tag("tablet_max_version", _tablet->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets",
cloud_tablet()->fetch_add_approximate_num_rowsets(0))
- .tag("cumu_num_rowsets",
cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
+ .tag("cumu_num_rowsets",
cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0))
+ .tag("local_read_bytes", _local_read_bytes_total)
+ .tag("remote_read_bytes", _remote_read_bytes_total);
_state = CompactionState::SUCCESS;
diff --git a/be/src/cloud/cloud_full_compaction.cpp
b/be/src/cloud/cloud_full_compaction.cpp
index 470be500185..f04b7d1d4a7 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -182,7 +182,9 @@ Status CloudFullCompaction::execute_compact() {
.tag("output_segments", _output_rowset->num_segments())
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
- .tag("output_rowset_total_size",
_output_rowset->total_disk_size());
+ .tag("output_rowset_total_size", _output_rowset->total_disk_size())
+ .tag("local_read_bytes", _local_read_bytes_total)
+ .tag("remote_read_bytes", _remote_read_bytes_total);
_state = CompactionState::SUCCESS;
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index d1481face47..9b8528c1a21 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -58,6 +58,7 @@
#include "olap/storage_policy.h"
#include "runtime/memory/cache_manager.h"
#include "util/parse_util.h"
+#include "util/time.h"
#include "vec/common/assert_cast.h"
namespace doris {
@@ -444,6 +445,7 @@ void
CloudStorageEngine::_compaction_tasks_producer_callback() {
int64_t interval = config::generate_compaction_tasks_interval_ms;
do {
+ int64_t cur_time = UnixMillis();
if (!config::disable_auto_compaction) {
Status st = _adjust_compaction_thread_num();
if (!st.ok()) {
@@ -451,7 +453,6 @@ void
CloudStorageEngine::_compaction_tasks_producer_callback() {
}
bool check_score = false;
- int64_t cur_time = UnixMillis();
if (round <
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
compaction_type = CompactionType::CUMULATIVE_COMPACTION;
round++;
@@ -503,6 +504,9 @@ void
CloudStorageEngine::_compaction_tasks_producer_callback() {
} else {
interval = config::check_auto_compaction_interval_seconds * 1000;
}
+ int64_t end_time = UnixMillis();
+
DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time
-
+
cur_time);
} while
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
}
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index e5012a36eb4..3d6d6cad558 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -53,6 +53,7 @@
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/beta_rowset_reader.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_fwd.h"
@@ -74,6 +75,7 @@
#include "olap/utils.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
+#include "util/doris_metrics.h"
#include "util/time.h"
#include "util/trace.h"
@@ -241,6 +243,14 @@ Status Compaction::merge_input_rowsets() {
}
}
+ _local_read_bytes_total = _stats.bytes_read_from_local;
+ _remote_read_bytes_total = _stats.bytes_read_from_remote;
+
DorisMetrics::instance()->local_compaction_read_bytes_total->increment(_local_read_bytes_total);
+ DorisMetrics::instance()->remote_compaction_read_bytes_total->increment(
+ _remote_read_bytes_total);
+ DorisMetrics::instance()->local_compaction_write_bytes_total->increment(
+ _stats.cached_bytes_total);
+
COUNTER_UPDATE(_output_rowset_data_size_counter,
_output_rowset->data_disk_size());
COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows());
COUNTER_UPDATE(_output_segments_num_counter,
_output_rowset->num_segments());
@@ -346,6 +356,8 @@ void CompactionMixin::build_basic_info() {
COUNTER_UPDATE(_input_row_num_counter, _input_row_num);
COUNTER_UPDATE(_input_segments_num_counter, _input_num_segments);
+
TEST_SYNC_POINT_RETURN_WITH_VOID("compaction::CompactionMixin::build_basic_info");
+
_output_version =
Version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
@@ -448,6 +460,17 @@ Status CompactionMixin::execute_compact() {
}
}
+
DorisMetrics::instance()->local_compaction_read_rows_total->increment(_input_row_num);
+ DorisMetrics::instance()->local_compaction_read_bytes_total->increment(
+ _input_rowsets_total_size);
+
+
TEST_SYNC_POINT_RETURN_WITH_VALUE("compaction::CompactionMixin::execute_compact",
Status::OK());
+
+ DorisMetrics::instance()->local_compaction_write_rows_total->increment(
+ _output_rowset->num_rows());
+ DorisMetrics::instance()->local_compaction_write_bytes_total->increment(
+ _output_rowset->total_disk_size());
+
_load_segment_to_cache();
return Status::OK();
}
@@ -474,6 +497,9 @@ Status CompactionMixin::execute_compact_impl(int64_t
permits) {
}
build_basic_info();
+
TEST_SYNC_POINT_RETURN_WITH_VALUE("compaction::CompactionMixin::execute_compact_impl",
+ Status::OK());
+
VLOG_DEBUG << "dump tablet schema: " <<
_cur_tablet_schema->dump_structure();
LOG(INFO) << "start " << compaction_name() << ". tablet=" <<
_tablet->tablet_id()
@@ -489,8 +515,12 @@ Status CompactionMixin::execute_compact_impl(int64_t
permits) {
<< ". tablet=" << _tablet->tablet_id() << ", output_version=" <<
_output_version
<< ", current_max_version=" << tablet()->max_version().second
<< ", disk=" << tablet()->data_dir()->path() << ", segments=" <<
_input_num_segments
- << ", input_data_size=" << _input_rowsets_data_size
- << ", output_rowset_size=" << _output_rowset->total_disk_size()
+ << ", input_rowsets_data_size=" << _input_rowsets_data_size
+ << ", input_rowsets_index_size=" << _input_rowsets_index_size
+ << ", input_rowsets_total_size=" << _input_rowsets_total_size
+ << ", output_rowset_data_size=" <<
_output_rowset->data_disk_size()
+ << ", output_rowset_index_size=" <<
_output_rowset->index_disk_size()
+ << ", output_rowset_total_size=" <<
_output_rowset->total_disk_size()
<< ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ", filtered_row_num=" << _stats.filtered_rows
@@ -1346,6 +1376,13 @@ Status CloudCompactionMixin::execute_compact() {
_tablet->tablet_id());
}
});
+
+
DorisMetrics::instance()->remote_compaction_read_rows_total->increment(_input_row_num);
+ DorisMetrics::instance()->remote_compaction_write_rows_total->increment(
+ _output_rowset->num_rows());
+ DorisMetrics::instance()->remote_compaction_write_bytes_total->increment(
+ _output_rowset->total_disk_size());
+
_load_segment_to_cache();
return Status::OK();
}
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 2627b136ea9..e4e74f333d0 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -102,6 +102,9 @@ protected:
int64_t _input_row_num {0};
int64_t _input_num_segments {0};
+ int64_t _local_read_bytes_total {};
+ int64_t _remote_read_bytes_total {};
+
Merger::Statistics _stats;
RowsetSharedPtr _output_rowset;
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 586efb9344f..73fe179c2ce 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -17,6 +17,8 @@
#include "olap/cumulative_compaction.h"
+#include <cpp/sync_point.h>
+
#include <memory>
#include <mutex>
#include <ostream>
@@ -151,6 +153,9 @@ Status CumulativeCompaction::execute_compact() {
st = CompactionMixin::execute_compact();
RETURN_IF_ERROR(st);
+ TEST_SYNC_POINT_RETURN_WITH_VALUE(
+ "cumulative_compaction::CumulativeCompaction::execute_compact",
Status::OK());
+
DCHECK_EQ(_state, CompactionState::SUCCESS);
if (tablet()->tablet_meta()->time_series_compaction_level_threshold() >=
2) {
tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(),
_input_rowsets,
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index bd7a1818051..b213808af24 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -139,6 +139,10 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet,
ReaderType reader_type,
stats_output->output_rows = output_rows;
stats_output->merged_rows = reader.merged_rows();
stats_output->filtered_rows = reader.filtered_rows();
+ stats_output->bytes_read_from_local =
reader.stats().file_cache_stats.bytes_read_from_local;
+ stats_output->bytes_read_from_remote =
+ reader.stats().file_cache_stats.bytes_read_from_remote;
+ stats_output->cached_bytes_total =
reader.stats().file_cache_stats.bytes_write_into_cache;
}
RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->flush(),
@@ -312,6 +316,10 @@ Status Merger::vertical_compact_one_group(
stats_output->output_rows = output_rows;
stats_output->merged_rows = reader.merged_rows();
stats_output->filtered_rows = reader.filtered_rows();
+ stats_output->bytes_read_from_local =
reader.stats().file_cache_stats.bytes_read_from_local;
+ stats_output->bytes_read_from_remote =
+ reader.stats().file_cache_stats.bytes_read_from_remote;
+ stats_output->cached_bytes_total =
reader.stats().file_cache_stats.bytes_write_into_cache;
}
RETURN_IF_ERROR(dst_rowset_writer->flush_columns(is_key));
@@ -356,6 +364,12 @@ Status Merger::vertical_compact_one_group(
stats_output->output_rows = output_rows;
stats_output->merged_rows = src_block_reader.merged_rows();
stats_output->filtered_rows = src_block_reader.filtered_rows();
+ stats_output->bytes_read_from_local =
+
src_block_reader.stats().file_cache_stats.bytes_read_from_local;
+ stats_output->bytes_read_from_remote =
+
src_block_reader.stats().file_cache_stats.bytes_read_from_remote;
+ stats_output->cached_bytes_total =
+
src_block_reader.stats().file_cache_stats.bytes_write_into_cache;
}
// segcompaction produce only one segment at once
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 7d430cde7f3..76d053a7a79 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -48,6 +48,10 @@ public:
int64_t merged_rows = 0;
int64_t filtered_rows = 0;
RowIdConversion* rowid_conversion = nullptr;
+ // these data for trans
+ int64_t cached_bytes_total = 0;
+ int64_t bytes_read_from_local = 0;
+ int64_t bytes_read_from_remote = 0;
};
// merge rows from `src_rowset_readers` and write into `dst_rowset_writer`.
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index b9c07d10dd0..8469503b8ed 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -663,6 +663,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int64_t interval = config::generate_compaction_tasks_interval_ms;
do {
+ int64_t cur_time = UnixMillis();
if (!config::disable_auto_compaction &&
(!config::enable_compaction_pause_on_high_memory ||
!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
@@ -716,6 +717,17 @@ void StorageEngine::_compaction_tasks_producer_callback() {
} else {
interval = 5000; // 5s to check disable_auto_compaction
}
+
+ // wait some seconds for ut test
+ {
+ std ::vector<std ::any> args {};
+ args.emplace_back(1);
+ doris ::SyncPoint ::get_instance()->process(
+ "StorageEngine::_compaction_tasks_producer_callback", std
::move(args));
+ }
+ int64_t end_time = UnixMillis();
+
DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time
-
+
cur_time);
} while
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
}
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index 33b2fb6a58c..d8124ad6124 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -88,6 +88,8 @@ public:
void set_topn_limit(size_t topn_limit) override { _topn_limit =
topn_limit; }
+ OlapReaderStatistics* get_stats() { return _stats; }
+
private:
[[nodiscard]] Status _init_iterator_once();
[[nodiscard]] Status _init_iterator();
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 362efa86809..8ec906cb630 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -130,6 +130,26 @@
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_receive_bytes_total, MetricUnit::BYT
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_rows_total, MetricUnit::ROWS,
"", stream_load,
Labels({{"type", "load_rows"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(compaction_producer_callback_a_round_time,
+ MetricUnit::ROWSETS);
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_read_rows_total,
MetricUnit::ROWS, "",
+ compaction_rows_total, Labels({{"type",
"read"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_read_bytes_total,
MetricUnit::BYTES, "",
+ compaction_bytes_total, Labels({{"type",
"read"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_write_rows_total,
MetricUnit::ROWS, "",
+ compaction_rows_total, Labels({{"type",
"write"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(local_compaction_write_bytes_total,
MetricUnit::BYTES, "",
+ compaction_bytes_total, Labels({{"type",
"write"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_read_rows_total,
MetricUnit::ROWS, "",
+ compaction_rows_total, Labels({{"type",
"read"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_read_bytes_total,
MetricUnit::BYTES, "",
+ compaction_bytes_total, Labels({{"type",
"read"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_write_rows_total,
MetricUnit::ROWS, "",
+ compaction_rows_total, Labels({{"type",
"write"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(remote_compaction_write_bytes_total,
MetricUnit::BYTES, "",
+ compaction_bytes_total, Labels({{"type",
"write"}}));
+
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES);
@@ -225,6 +245,8 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
push_request_write_bytes);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
push_request_write_rows);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
compaction_producer_callback_a_round_time);
+
// engine_requests_total
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
create_tablet_requests_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
create_tablet_requests_failed);
@@ -255,6 +277,15 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
alter_inverted_index_requests_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
alter_inverted_index_requests_failed);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
local_compaction_read_rows_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
local_compaction_read_bytes_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
local_compaction_write_rows_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
local_compaction_write_bytes_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
remote_compaction_read_rows_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
remote_compaction_read_bytes_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
remote_compaction_write_rows_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
remote_compaction_write_bytes_total);
+
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
base_compaction_deltas_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
base_compaction_bytes_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
cumulative_compaction_deltas_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 0d9c060bfb8..5a4bef95a85 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -80,6 +80,8 @@ public:
IntCounter* finish_task_requests_total = nullptr;
IntCounter* finish_task_requests_failed = nullptr;
+ IntCounter* compaction_producer_callback_a_round_time = nullptr;
+
IntCounter* base_compaction_request_total = nullptr;
IntCounter* base_compaction_request_failed = nullptr;
IntCounter* cumulative_compaction_request_total = nullptr;
@@ -88,6 +90,15 @@ public:
IntCounter* single_compaction_request_failed = nullptr;
IntCounter* single_compaction_request_cancelled = nullptr;
+ IntCounter* local_compaction_read_rows_total = nullptr;
+ IntCounter* local_compaction_read_bytes_total = nullptr;
+ IntCounter* local_compaction_write_rows_total = nullptr;
+ IntCounter* local_compaction_write_bytes_total = nullptr;
+ IntCounter* remote_compaction_read_rows_total = nullptr;
+ IntCounter* remote_compaction_read_bytes_total = nullptr;
+ IntCounter* remote_compaction_write_rows_total = nullptr;
+ IntCounter* remote_compaction_write_bytes_total = nullptr;
+
IntCounter* base_compaction_deltas_total = nullptr;
IntCounter* base_compaction_bytes_total = nullptr;
IntCounter* cumulative_compaction_deltas_total = nullptr;
diff --git a/be/test/olap/compaction_metrics_test.cpp
b/be/test/olap/compaction_metrics_test.cpp
index a556384ecc9..47f9aa65b6b 100644
--- a/be/test/olap/compaction_metrics_test.cpp
+++ b/be/test/olap/compaction_metrics_test.cpp
@@ -23,14 +23,15 @@
#include <unistd.h>
#include <chrono>
-#include <filesystem>
#include <memory>
+#include <thread>
-#include "common/logging.h"
+#include "common/config.h"
#include "common/status.h"
#include "cpp/sync_point.h"
-#include "gtest/gtest_pred_impl.h"
#include "io/fs/local_file_system.h"
+#include "olap/compaction.h"
+#include "olap/cumulative_compaction.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/data_dir.h"
#include "olap/rowset/rowset_factory.h"
@@ -79,6 +80,7 @@ static RowsetSharedPtr create_rowset(Version version, int
num_segments, bool ove
rs_meta->set_num_segments(num_segments);
rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
rs_meta->set_total_disk_size(data_size);
+ rs_meta->set_num_rows(50);
RowsetSharedPtr rowset;
Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
if (!st.ok()) {
@@ -106,6 +108,7 @@ TEST_F(CompactionMetricsTest,
TestCompactionTaskNumWithDiffStatus) {
bool* pred = try_any_cast<bool*>(values.back());
*pred = true;
});
+ Defer defer {[&]() {
sp->clear_call_back("olap_server::execute_compaction"); }};
for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) {
TabletMetaSharedPtr tablet_meta;
@@ -138,4 +141,109 @@ TEST_F(CompactionMetricsTest,
TestCompactionTaskNumWithDiffStatus) {
}
}
+TEST_F(CompactionMetricsTest, TestCompactionProducerSpendTime) {
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ bool disable_auto_compaction = config::disable_auto_compaction;
+ config::disable_auto_compaction = true;
+
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("StorageEngine::_compaction_tasks_producer_callback",
+ [](auto&& values) {
std::this_thread::sleep_for(std::chrono::seconds(1)); });
+
+ Defer defer {[&]() {
+ _storage_engine->_stop_background_threads_latch.count_down();
+ config::disable_auto_compaction = disable_auto_compaction;
+
sp->clear_call_back("StorageEngine::_compaction_tasks_producer_callback");
+ }};
+
+ // compaction tasks producer thread
+ st = Thread::create(
+ "StorageEngine", "compaction_tasks_producer_thread",
+ [this]() {
this->_storage_engine->_compaction_tasks_producer_callback(); },
+ &_storage_engine->_compaction_tasks_producer_thread);
+ EXPECT_TRUE(st.ok());
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+ // the compaction_producer_callback_a_round_time can't get an accurate
value
+ // just judge it great than 0
+
EXPECT_GT(DorisMetrics::instance()->compaction_producer_callback_a_round_time->value(),
0);
+}
+
+TEST_F(CompactionMetricsTest, TestCompactionReadWriteThroughput) {
+ DorisMetrics::instance()->local_compaction_read_bytes_total->set_value(0);
+ DorisMetrics::instance()->local_compaction_read_rows_total->set_value(0);
+ DorisMetrics::instance()->local_compaction_write_bytes_total->set_value(0);
+ DorisMetrics::instance()->local_compaction_write_rows_total->set_value(0);
+
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("compaction::CompactionMixin::build_basic_info",
[](auto&& values) {
+ bool* pred = try_any_cast<bool*>(values.back());
+ *pred = true;
+ });
+ sp->set_call_back("compaction::CompactionMixin::execute_compact_impl",
[&](auto&& values) {
+ auto* pairs = try_any_cast<std::pair<Status, bool>*>(values.back());
+ pairs->second = true;
+ });
+ sp->set_call_back("compaction::CompactionMixin::execute_compact",
[&](auto&& values) {
+ auto* pairs = try_any_cast<std::pair<Status, bool>*>(values.back());
+ pairs->second = true;
+ });
+
sp->set_call_back("cumulative_compaction::CumulativeCompaction::execute_compact",
+ [&](auto&& values) {
+ auto* pairs = try_any_cast<std::pair<Status,
bool>*>(values.back());
+ pairs->second = true;
+ });
+ Defer defer {[&]() {
+ sp->clear_call_back("compaction::CompactionMixin::build_basic_info");
+
sp->clear_call_back("compaction::CompactionMixin::execute_compact_impl");
+ sp->clear_call_back("compaction::CompactionMixin::execute_compact");
+
sp->clear_call_back("cumulative_compaction::CumulativeCompaction::execute_compact");
+ }};
+
+ TabletMetaSharedPtr tablet_meta;
+ tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), tablet_meta,
_data_dir.get(),
+ CUMULATIVE_SIZE_BASED_POLICY));
+ st = tablet->init();
+ EXPECT_TRUE(st.ok());
+
+ for (int i = 2; i < 30; ++i) {
+ RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+ tablet->_rs_version_map.emplace(rs->version(), rs);
+ }
+ tablet->_cumulative_point = 2;
+
+ st = _storage_engine->_submit_compaction_task(tablet,
CompactionType::CUMULATIVE_COMPACTION,
+ false);
+ EXPECT_TRUE(st.ok());
+
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+
+
EXPECT_EQ(DorisMetrics::instance()->local_compaction_read_bytes_total->value(),
1024 * 28);
+
EXPECT_EQ(DorisMetrics::instance()->local_compaction_read_rows_total->value(),
50 * 28);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]