This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 22cb7b8fcbb [improvement](compaction) be do not compact invisible
version to avoid query error -230 #28082 (#36222)
22cb7b8fcbb is described below
commit 22cb7b8fcbb938e9342361c2d5bc1ec1fe23a579
Author: yujun <[email protected]>
AuthorDate: Thu Jun 27 13:45:21 2024 +0800
[improvement](compaction) be do not compact invisible version to avoid
query error -230 #28082 (#36222)
cherry pick from #28082
---
be/src/agent/agent_server.cpp | 13 +
be/src/agent/agent_server.h | 1 +
be/src/agent/task_worker_pool.cpp | 13 +
be/src/agent/task_worker_pool.h | 2 +
be/src/common/config.cpp | 7 +
be/src/common/config.h | 7 +
be/src/olap/full_compaction.cpp | 2 +-
be/src/olap/olap_common.h | 21 +
be/src/olap/tablet.cpp | 74 +++-
be/src/olap/tablet.h | 13 +
be/src/olap/tablet_manager.cpp | 72 +++-
be/src/olap/tablet_manager.h | 19 +-
be/src/olap/tablet_meta.cpp | 10 +
be/src/olap/tablet_meta.h | 1 +
be/src/olap/task/engine_clone_task.cpp | 2 +
.../main/java/org/apache/doris/common/Config.java | 6 +-
.../java/org/apache/doris/alter/AlterHandler.java | 3 +-
.../java/org/apache/doris/backup/RestoreJob.java | 3 +-
.../main/java/org/apache/doris/catalog/Env.java | 8 +-
.../org/apache/doris/catalog/MetadataViewer.java | 2 +-
.../java/org/apache/doris/catalog/Replica.java | 75 ++--
.../main/java/org/apache/doris/catalog/Tablet.java | 22 +-
.../apache/doris/catalog/TabletInvertedIndex.java | 50 ++-
.../org/apache/doris/catalog/TabletStatMgr.java | 11 +-
.../org/apache/doris/clone/TabletSchedCtx.java | 13 +-
.../org/apache/doris/clone/TabletScheduler.java | 6 +-
.../apache/doris/common/proc/ReplicasProcNode.java | 5 +-
.../doris/common/proc/TabletHealthProcDir.java | 3 +-
.../apache/doris/common/proc/TabletsProcDir.java | 11 +-
.../apache/doris/datasource/InternalCatalog.java | 5 +-
...oCollector.java => PartitionInfoCollector.java} | 47 ++-
.../org/apache/doris/master/ReportHandler.java | 67 +--
.../java/org/apache/doris/system/Diagnoser.java | 5 +-
.../java/org/apache/doris/task/AgentBatchTask.java | 10 +
.../doris/task/UpdateVisibleVersionTask.java | 40 ++
.../doris/transaction/DatabaseTransactionMgr.java | 24 +-
.../doris/transaction/GlobalTransactionMgr.java | 5 +-
.../doris/transaction/PublishVersionDaemon.java | 30 +-
.../org/apache/doris/alter/RollupJobV2Test.java | 10 +-
.../apache/doris/alter/SchemaChangeJobV2Test.java | 6 +-
.../org/apache/doris/analysis/ShowReplicaTest.java | 3 +-
.../java/org/apache/doris/catalog/ReplicaTest.java | 21 +-
.../doris/clone/DiskReblanceWhenSchedulerIdle.java | 3 +-
.../org/apache/doris/clone/RebalancerTestUtil.java | 5 +-
.../org/apache/doris/clone/RepairVersionTest.java | 8 +-
.../doris/clone/TabletReplicaTooSlowTest.java | 4 +-
.../org/apache/doris/clone/TabletSchedCtxTest.java | 16 +-
.../org/apache/doris/planner/QueryPlanTest.java | 12 +-
.../transaction/DatabaseTransactionMgrTest.java | 6 +-
.../transaction/GlobalTransactionMgrTest.java | 29 +-
gensrc/thrift/AgentService.thrift | 5 +
gensrc/thrift/BackendService.thrift | 5 +-
gensrc/thrift/MasterService.thrift | 4 +-
gensrc/thrift/Types.thrift | 3 +-
.../test_compaction_with_visible_version.out | 448 +++++++++++++++++++++
.../doris/regression/suite/SuiteCluster.groovy | 3 +
.../test_compaction_with_visible_version.groovy | 275 +++++++++++++
57 files changed, 1333 insertions(+), 241 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index e7217cdcff0..5355c037b19 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -163,6 +163,10 @@ void AgentServer::start_workers(ExecEnv* exec_env) {
_clean_trash_workers = std::make_unique<TaskWorkerPool>(
"CLEAN_TRASH", 1, [&engine](auto&& task) {return
clean_trash_callback(engine, task); });
+
+ _update_visible_version_workers = std::make_unique<TaskWorkerPool>(
+ "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return
visible_version_callback(engine, task); });
+
// clang-format on
}
@@ -278,6 +282,15 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
"task(signature={}) has wrong request member =
clean_trash_req", signature);
}
break;
+ case TTaskType::UPDATE_VISIBLE_VERSION:
+ if (task.__isset.visible_version_req) {
+ _update_visible_version_workers->submit_task(task);
+ } else {
+ ret_st = Status::InvalidArgument(
+ "task(signature={}) has wrong request member =
visible_version_req",
+ signature);
+ }
+ break;
default:
ret_st = Status::InvalidArgument("task(signature={}, type={}) has
wrong task type",
signature, task_type);
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index b789bbe98de..9f3d91d5621 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -97,6 +97,7 @@ private:
std::unique_ptr<TopicSubscriber> _topic_subscriber;
std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
std::unique_ptr<TaskWorkerPool> _clean_trash_workers;
+ std::unique_ptr<TaskWorkerPool> _update_visible_version_workers;
};
} // end namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 686de72e1b9..c9d222114e0 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -355,6 +355,7 @@ bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task",
"STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
+bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task",
"UPDATE_VISIBLE_VERSION");
void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
@@ -382,6 +383,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(CLONE)
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
+ ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
@@ -970,6 +972,11 @@ void report_tablet_callback(StorageEngine& engine, const
TMasterInfo& master_inf
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
return;
}
+
+ std::map<int64_t, int64_t> partitions_version;
+
engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
+ request.__set_partitions_version(std::move(partitions_version));
+
int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
@@ -1699,6 +1706,12 @@ void gc_binlog_callback(StorageEngine& engine, const
TAgentTaskRequest& req) {
engine.gc_binlogs(gc_tablet_infos);
}
+void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest&
req) {
+ const TVisibleVersionReq& visible_version_req = req.visible_version_req;
+ engine.tablet_manager()->update_partitions_visible_version(
+ visible_version_req.partition_version);
+}
+
void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
const TAgentTaskRequest& req) {
const auto& clone_req = req.clone_req;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index f95a866a57a..14d9ff32686 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -163,6 +163,8 @@ void gc_binlog_callback(StorageEngine& engine, const
TAgentTaskRequest& req);
void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);
+void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest&
req);
+
void report_task_callback(const TMasterInfo& master_info);
void report_disk_callback(StorageEngine& engine, const TMasterInfo&
master_info);
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ba173b0d03f..493ad699aac 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -430,6 +430,13 @@ DEFINE_Validator(compaction_task_num_per_fast_disk,
// How many rounds of cumulative compaction for each round of base compaction
when compaction tasks generation.
DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round,
"9");
+// Not compact the invisible versions, but with some limitations:
+// if not timeout, keep no more than
compaction_keep_invisible_version_max_count versions;
+// if timeout, keep no more than compaction_keep_invisible_version_min_count
versions.
+DEFINE_mInt32(compaction_keep_invisible_version_timeout_sec, "1800");
+DEFINE_mInt32(compaction_keep_invisible_version_min_count, "50");
+DEFINE_mInt32(compaction_keep_invisible_version_max_count, "500");
+
// Threshold to logging compaction trace, in seconds.
DEFINE_mInt32(base_compaction_trace_threshold, "60");
DEFINE_mInt32(cumulative_compaction_trace_threshold, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5c60ffae258..177bb03e02b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -477,6 +477,13 @@ DECLARE_mInt32(compaction_task_num_per_fast_disk);
// How many rounds of cumulative compaction for each round of base compaction
when compaction tasks generation.
DECLARE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round);
+// Not compact the invisible versions, but with some limitations:
+// if not timeout, keep no more than
compaction_keep_invisible_version_max_count versions;
+// if timeout, keep no more than compaction_keep_invisible_version_min_count
versions.
+DECLARE_mInt32(compaction_keep_invisible_version_timeout_sec);
+DECLARE_mInt32(compaction_keep_invisible_version_min_count);
+DECLARE_mInt32(compaction_keep_invisible_version_max_count);
+
// Threshold to logging compaction trace, in seconds.
DECLARE_mInt32(base_compaction_trace_threshold);
DECLARE_mInt32(cumulative_compaction_trace_threshold);
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index ba45eacb2c4..c11b87c06ad 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -136,7 +136,7 @@ Status FullCompaction::_check_all_version(const
std::vector<RowsetSharedPtr>& ro
"Full compaction rowsets' versions not equal to all exist
rowsets' versions. "
"full compaction rowsets max version={}-{}"
", current rowsets max version={}-{}"
- "full compaction rowsets min version={}-{}, current rowsets
min version=0-1",
+ ", full compaction rowsets min version={}-{}, current rowsets
min version=0-1",
last_rowset->start_version(), last_rowset->end_version(),
_tablet->max_version().first, _tablet->max_version().second,
first_rowset->start_version(), first_rowset->end_version());
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 80bfab6f4b9..c1a2e3c18b5 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -20,6 +20,7 @@
#include <gen_cpp/Types_types.h>
#include <netinet/in.h>
+#include <atomic>
#include <charconv>
#include <cstdint>
#include <functional>
@@ -37,6 +38,7 @@
#include "olap/olap_define.h"
#include "olap/rowset/rowset_fwd.h"
#include "util/hash_util.hpp"
+#include "util/time.h"
#include "util/uid_util.h"
namespace doris {
@@ -517,6 +519,25 @@ struct RidAndPos {
using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t,
std::vector<RidAndPos>>>;
+// used for controll compaction
+struct VersionWithTime {
+ std::atomic<int64_t> version;
+ int64_t update_ts;
+
+ VersionWithTime() : version(0), update_ts(MonotonicMillis()) {}
+
+ void update_version_monoto(int64_t new_version) {
+ int64_t cur_version = version.load(std::memory_order_relaxed);
+ while (cur_version < new_version) {
+ if (version.compare_exchange_strong(cur_version, new_version,
std::memory_order_relaxed,
+ std::memory_order_relaxed)) {
+ update_ts = MonotonicMillis();
+ break;
+ }
+ }
+ }
+};
+
} // namespace doris
// This intended to be a "good" hash function. It may change from time to
time.
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 27b1f94530d..4cf8fae0ded 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1349,25 +1349,41 @@ std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_cumulative_compac
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return candidate_rowsets;
}
- {
- std::shared_lock rlock(_meta_lock);
- for (const auto& [version, rs] : _rs_version_map) {
- if (version.first >= _cumulative_point && rs->is_local()) {
- candidate_rowsets.push_back(rs);
- }
- }
- }
- std::sort(candidate_rowsets.begin(), candidate_rowsets.end(),
Rowset::comparator);
- return candidate_rowsets;
+ return _pick_visible_rowsets_to_compaction(_cumulative_point,
+
std::numeric_limits<int64_t>::max());
}
std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_base_compaction() {
+ return
_pick_visible_rowsets_to_compaction(std::numeric_limits<int64_t>::min(),
+ _cumulative_point - 1);
+}
+
+std::vector<RowsetSharedPtr> Tablet::_pick_visible_rowsets_to_compaction(
+ int64_t min_start_version, int64_t max_start_version) {
+ auto [visible_version, update_ts] = get_visible_version_and_time();
+ bool update_time_long = MonotonicMillis() - update_ts >
+
config::compaction_keep_invisible_version_timeout_sec * 1000L;
+ int32_t keep_invisible_version_limit =
+ update_time_long ?
config::compaction_keep_invisible_version_min_count
+ :
config::compaction_keep_invisible_version_max_count;
+
std::vector<RowsetSharedPtr> candidate_rowsets;
{
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
- // Do compaction on local rowsets only.
- if (version.first < _cumulative_point && rs->is_local()) {
+ int64_t version_start = version.first;
+ // rowset is remote or rowset is not in given range
+ if (!rs->is_local() || version_start < min_start_version ||
+ version_start > max_start_version) {
+ continue;
+ }
+
+ // can compact, met one of the conditions:
+ // 1. had been visible;
+ // 2. exceeds the limit of keep invisible versions.
+ int64_t version_end = version.second;
+ if (version_end <= visible_version ||
+ version_end > visible_version + keep_invisible_version_limit) {
candidate_rowsets.push_back(rs);
}
}
@@ -1390,13 +1406,8 @@ std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_full_compaction()
std::vector<RowsetSharedPtr> Tablet::pick_first_consecutive_empty_rowsets(int
limit) {
std::vector<RowsetSharedPtr> consecutive_empty_rowsets;
- std::vector<RowsetSharedPtr> candidate_rowsets;
- traverse_rowsets([&candidate_rowsets, this](const auto& rs) {
- if (rs->is_local() && rs->start_version() >= _cumulative_point) {
- candidate_rowsets.emplace_back(rs);
- }
- });
- std::sort(candidate_rowsets.begin(), candidate_rowsets.end(),
Rowset::comparator);
+ std::vector<RowsetSharedPtr> candidate_rowsets =
+ pick_candidate_rowsets_to_cumulative_compaction();
int len = candidate_rowsets.size();
for (int i = 0; i < len - 1; ++i) {
auto rowset = candidate_rowsets[i];
@@ -1475,6 +1486,19 @@ std::string Tablet::_get_rowset_info_str(RowsetSharedPtr
rowset, bool delete_fla
rowset->rowset_id().to_string(), disk_size);
}
+std::tuple<int64_t, int64_t> Tablet::get_visible_version_and_time() const {
+ // some old tablet has bug, its partition_id is 0, fe couldn't update its
visible version.
+ // so let this tablet's visible version become int64 max.
+ auto version_info = std::atomic_load_explicit(&_visible_version,
std::memory_order_relaxed);
+ if (version_info != nullptr && partition_id() != 0) {
+ return
std::make_tuple(version_info->version.load(std::memory_order_relaxed),
+ version_info->update_ts);
+ } else {
+ return std::make_tuple(std::numeric_limits<int64_t>::max(),
+ std::numeric_limits<int64_t>::max());
+ }
+}
+
// For http compaction action
void Tablet::get_compaction_status(std::string* json_result) {
rapidjson::Document root;
@@ -1837,13 +1861,23 @@ void Tablet::build_tablet_report_info(TTabletInfo*
tablet_info,
}
});
+ int64_t total_version_count = _tablet_meta->version_count();
+
+ // For compatibility.
+ // For old fe, it wouldn't send visible version request to be, then be's
visible version is always 0.
+ // Let visible_version_count set to total_version_count in be's report.
+ int64_t visible_version_count = total_version_count;
+ if (auto [visible_version, _] = get_visible_version_and_time();
visible_version > 0) {
+ visible_version_count =
_tablet_meta->version_count_cross_with_range({0, visible_version});
+ }
// the report version is the largest continuous version, same logic as in
FE side
tablet_info->__set_version(cversion.second);
// Useless but it is a required filed in TTabletInfo
tablet_info->__set_version_hash(0);
tablet_info->__set_partition_id(_tablet_meta->partition_id());
tablet_info->__set_storage_medium(_data_dir->storage_medium());
- tablet_info->__set_version_count(_tablet_meta->version_count());
+ tablet_info->__set_total_version_count(total_version_count);
+ tablet_info->__set_visible_version_count(visible_version_count);
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
tablet_info->__set_replica_id(replica_id());
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 46bd6802495..4de6a05d745 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -347,6 +347,12 @@ public:
bool should_fetch_from_peer();
+ std::tuple<int64_t, int64_t> get_visible_version_and_time() const;
+
+ void set_visible_version(const std::shared_ptr<const VersionWithTime>&
visible_version) {
+ std::atomic_store_explicit(&_visible_version, visible_version,
std::memory_order_relaxed);
+ }
+
inline bool all_beta() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
@@ -625,6 +631,10 @@ private:
// When the proportion of empty edges in the adjacency matrix used to
represent the version graph
// in the version tracker is greater than the threshold, rebuild the
version tracker
bool _reconstruct_version_tracker_if_necessary();
+
+ std::vector<RowsetSharedPtr> _pick_visible_rowsets_to_compaction(int64_t
min_start_version,
+ int64_t
max_start_version);
+
void _init_context_common_fields(RowsetWriterContext& context);
void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const
RowsetIdUnorderedSet& pre,
@@ -740,6 +750,9 @@ private:
int64_t _io_error_times = 0;
+ // partition's visible version. it sync from fe, but not real-time.
+ std::shared_ptr<const VersionWithTime> _visible_version;
+
std::atomic_bool _is_full_compaction_running = false;
};
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 196b6dc9543..43cbba99087 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1024,13 +1024,14 @@ void
TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
tablet_info.__set_transaction_ids(find->second);
expire_txn_map.erase(find);
}
- tablet_version_num_hist.add(tablet->version_count());
+ tablet_version_num_hist.add(tablet_info.total_version_count);
auto& t_tablet_stat = local_cache->emplace_back();
t_tablet_stat.__set_tablet_id(tablet_info.tablet_id);
t_tablet_stat.__set_data_size(tablet_info.data_size);
t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size);
- t_tablet_stat.__set_row_num(tablet_info.row_count);
- t_tablet_stat.__set_version_count(tablet_info.version_count);
+ t_tablet_stat.__set_row_count(tablet_info.row_count);
+
t_tablet_stat.__set_total_version_count(tablet_info.total_version_count);
+
t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count);
};
for_each_tablet(handler, filter_all_tablets);
@@ -1265,9 +1266,29 @@ void
TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_ma
void TabletManager::get_partition_related_tablets(int64_t partition_id,
std::set<TabletInfo>*
tablet_infos) {
- std::shared_lock rdlock(_partition_tablet_map_lock);
- if (_partition_tablet_map.find(partition_id) !=
_partition_tablet_map.end()) {
- *tablet_infos = _partition_tablet_map[partition_id];
+ std::shared_lock rdlock(_partitions_lock);
+ auto it = _partitions.find(partition_id);
+ if (it != _partitions.end()) {
+ *tablet_infos = it->second.tablets;
+ }
+}
+
+void TabletManager::get_partitions_visible_version(std::map<int64_t, int64_t>*
partitions_version) {
+ std::shared_lock rdlock(_partitions_lock);
+ for (const auto& [partition_id, partition] : _partitions) {
+ partitions_version->insert(
+ {partition_id,
partition.visible_version->version.load(std::memory_order_relaxed)});
+ }
+}
+
+void TabletManager::update_partitions_visible_version(
+ const std::map<int64_t, int64_t>& partitions_version) {
+ std::shared_lock rdlock(_partitions_lock);
+ for (auto [partition_id, version] : partitions_version) {
+ auto it = _partitions.find(partition_id);
+ if (it != _partitions.end()) {
+ it->second.visible_version->update_version_monoto(version);
+ }
}
}
@@ -1356,15 +1377,25 @@ TabletSharedPtr
TabletManager::_get_tablet_unlocked(TTabletId tablet_id) {
}
void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) {
- std::lock_guard<std::shared_mutex> wrlock(_partition_tablet_map_lock);
-
_partition_tablet_map[tablet->partition_id()].insert(tablet->get_tablet_info());
+ std::lock_guard<std::shared_mutex> wrlock(_partitions_lock);
+ auto& partition = _partitions[tablet->partition_id()];
+ partition.tablets.insert(tablet->get_tablet_info());
+ tablet->set_visible_version(
+ std::static_pointer_cast<const
VersionWithTime>(partition.visible_version));
}
void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr&
tablet) {
- std::lock_guard<std::shared_mutex> wrlock(_partition_tablet_map_lock);
-
_partition_tablet_map[tablet->partition_id()].erase(tablet->get_tablet_info());
- if (_partition_tablet_map[tablet->partition_id()].empty()) {
- _partition_tablet_map.erase(tablet->partition_id());
+ tablet->set_visible_version(nullptr);
+ std::lock_guard<std::shared_mutex> wrlock(_partitions_lock);
+ auto it = _partitions.find(tablet->partition_id());
+ if (it == _partitions.end()) {
+ return;
+ }
+
+ auto& tablets = it->second.tablets;
+ tablets.erase(tablet->get_tablet_info());
+ if (tablets.empty()) {
+ _partitions.erase(it);
}
}
@@ -1401,22 +1432,23 @@ void
TabletManager::get_tablets_distribution_on_different_disks(
std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk,
std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>&
tablets_info_on_disk) {
std::vector<DataDir*> data_dirs = _engine.get_stores();
- std::map<int64_t, std::set<TabletInfo>> partition_tablet_map;
+ std::map<int64_t, Partition> partitions;
{
- // When drop tablet, '_partition_tablet_map_lock' is locked in
'tablet_shard_lock'.
- // To avoid locking 'tablet_shard_lock' in
'_partition_tablet_map_lock', we lock and
- // copy _partition_tablet_map here.
- std::shared_lock rdlock(_partition_tablet_map_lock);
- partition_tablet_map = _partition_tablet_map;
+ // When drop tablet, '_partitions_lock' is locked in
'tablet_shard_lock'.
+ // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock
and
+ // copy _partitions here.
+ std::shared_lock rdlock(_partitions_lock);
+ partitions = _partitions;
}
- for (auto& [partition_id, tablet_infos] : partition_tablet_map) {
+
+ for (const auto& [partition_id, partition] : partitions) {
std::map<DataDir*, int64_t> tablets_num;
std::map<DataDir*, std::vector<TabletSize>> tablets_info;
for (auto* data_dir : data_dirs) {
tablets_num[data_dir] = 0;
}
- for (const auto& tablet_info : tablet_infos) {
+ for (const auto& tablet_info : partition.tablets) {
// get_tablet() will hold 'tablet_shard_lock'
TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id);
if (tablet == nullptr) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 9f8164b853f..b090277677b 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -144,6 +144,10 @@ public:
void get_partition_related_tablets(int64_t partition_id,
std::set<TabletInfo>* tablet_infos);
+ void get_partitions_visible_version(std::map<int64_t, int64_t>*
partitions_version);
+
+ void update_partitions_visible_version(const std::map<int64_t, int64_t>&
partitions_version);
+
void do_tablet_meta_checkpoint(DataDir* data_dir);
void obtain_specific_quantity_tablets(std::vector<TabletInfo>&
tablets_info, int64_t num);
@@ -229,22 +233,27 @@ private:
std::set<int64_t> tablets_under_clone;
};
+ struct Partition {
+ std::set<TabletInfo> tablets;
+ std::shared_ptr<VersionWithTime> visible_version {new VersionWithTime};
+ };
+
StorageEngine& _engine;
// TODO: memory size of TabletSchema cannot be accurately tracked.
- // trace the memory use by meta of tablet
std::shared_ptr<MemTracker> _tablet_meta_mem_tracker;
const int32_t _tablets_shards_size;
const int32_t _tablets_shards_mask;
std::vector<tablets_shard> _tablets_shards;
- // Protect _partition_tablet_map, should not be obtained before
_tablet_map_lock to avoid dead lock
- std::shared_mutex _partition_tablet_map_lock;
+ // Protect _partitions, should not be obtained before _tablet_map_lock to
avoid dead lock
+ std::shared_mutex _partitions_lock;
+ // partition_id => partition
+ std::map<int64_t, Partition> _partitions;
+
// Protect _shutdown_tablets, should not be obtained before
_tablet_map_lock to avoid dead lock
std::shared_mutex _shutdown_tablets_lock;
- // partition_id => tablet_info
- std::map<int64_t, std::set<TabletInfo>> _partition_tablet_map;
// the delete tablets. notice only allow function `start_trash_sweep` can
erase tablets in _shutdown_tablets
std::list<TabletSharedPtr> _shutdown_tablets;
std::mutex _gc_tablets_lock;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 4414f7187cd..72bf0f0ee39 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -745,6 +745,16 @@ Version TabletMeta::max_version() const {
return max_version;
}
+size_t TabletMeta::version_count_cross_with_range(const Version& range) const {
+ size_t count = 0;
+ for (const auto& rs_meta : _rs_metas) {
+ if (!(range.first > rs_meta->version().second || range.second <
rs_meta->version().first)) {
+ count++;
+ }
+ }
+ return count;
+}
+
Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) {
// check RowsetMeta is valid
for (auto& rs : _rs_metas) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 460851bc772..ae038fa2c92 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -166,6 +166,7 @@ public:
// Remote disk space occupied by tablet.
size_t tablet_remote_size() const;
size_t version_count() const;
+ size_t version_count_cross_with_range(const Version& range) const;
Version max_version() const;
TabletState tablet_state() const;
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 81c9973fcf2..ae5fc4c18dc 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -161,6 +161,8 @@ Status EngineCloneTask::execute() {
}
Status st = _do_clone();
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
+
StorageEngine::instance()->tablet_manager()->update_partitions_visible_version(
+ {{_clone_req.partition_id, _clone_req.version}});
return st;
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6adf03c56cd..e1b29b7f849 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1508,11 +1508,11 @@ public class Config extends ConfigBase {
public static int default_max_query_instances = -1;
/*
- * One master daemon thread will update global partition in memory
- * info every partition_in_memory_update_interval_secs
+ * One master daemon thread will update global partition info, include in
memory and visible version
+ * info every partition_info_update_interval_secs
*/
@ConfField(mutable = false, masterOnly = true)
- public static int partition_in_memory_update_interval_secs = 300;
+ public static int partition_info_update_interval_secs = 60;
@ConfField(masterOnly = true)
public static boolean enable_concurrent_update = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
index 030fd17452d..4231b11c987 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -241,8 +241,7 @@ public abstract class AlterHandler extends MasterDaemon {
task.getSignature(), replica, task.getVersion());
boolean versionChanged = false;
if (replica.getVersion() < task.getVersion()) {
- replica.updateVersionInfo(task.getVersion(),
replica.getDataSize(), replica.getRemoteDataSize(),
- replica.getRowCount());
+ replica.updateVersion(task.getVersion());
versionChanged = true;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 12c42aee737..63459e5da53 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1785,8 +1785,7 @@ public class RestoreJob extends AbstractJob {
for (Tablet tablet : idx.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
if
(!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) {
-
replica.updateVersionInfo(part.getVisibleVersion(), replica.getDataSize(),
- replica.getRemoteDataSize(),
replica.getRowCount());
+
replica.updateVersion(part.getVisibleVersion());
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 09cb46fdf27..380eb361c00 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -178,7 +178,7 @@ import org.apache.doris.load.sync.SyncChecker;
import org.apache.doris.load.sync.SyncJobManager;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
-import org.apache.doris.master.PartitionInMemoryInfoCollector;
+import org.apache.doris.master.PartitionInfoCollector;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVAlterOpType;
@@ -370,7 +370,7 @@ public class Env {
private PublishVersionDaemon publishVersionDaemon;
private DeleteHandler deleteHandler;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
- private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
+ private PartitionInfoCollector partitionInfoCollector;
private CooldownConfHandler cooldownConfHandler;
private ExternalMetaIdMgr externalMetaIdMgr;
private MetastoreEventsProcessor metastoreEventsProcessor;
@@ -665,7 +665,7 @@ public class Env {
this.publishVersionDaemon = new PublishVersionDaemon();
this.deleteHandler = new DeleteHandler();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
- this.partitionInMemoryInfoCollector = new
PartitionInMemoryInfoCollector();
+ this.partitionInfoCollector = new PartitionInfoCollector();
if (Config.enable_storage_policy) {
this.cooldownConfHandler = new CooldownConfHandler();
}
@@ -1691,7 +1691,7 @@ public class Env {
// start daemon thread to update db used data quota for db txn manager
periodically
dbUsedDataQuotaInfoCollector.start();
// start daemon thread to update global partition in memory
information periodically
- partitionInMemoryInfoCollector.start();
+ partitionInfoCollector.start();
if (Config.enable_storage_policy) {
cooldownConfHandler.start();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
index 333b7b146ac..1f1e2599d9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
@@ -110,7 +110,7 @@ public class MetadataViewer {
row.add(String.valueOf(replica.getLastSuccessVersion()));
row.add(String.valueOf(visibleVersion));
row.add(String.valueOf(replica.getSchemaHash()));
- row.add(String.valueOf(replica.getVersionCount()));
+
row.add(String.valueOf(replica.getTotalVersionCount()));
row.add(String.valueOf(replica.isBad()));
row.add(String.valueOf(replica.isUserDrop()));
row.add(replica.getState().name());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index ae8f4f37eed..f6b2944d754 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TUniqueId;
import com.google.gson.annotations.SerializedName;
@@ -109,7 +110,8 @@ public class Replica implements Writable {
@SerializedName(value = "lastSuccessVersionHash")
private long lastSuccessVersionHash = 0L;
- private volatile long versionCount = -1;
+ private volatile long totalVersionCount = -1;
+ private volatile long visibleVersionCount = -1;
private long pathHash = -1;
@@ -233,14 +235,26 @@ public class Replica implements Writable {
return dataSize;
}
+ public void setDataSize(long dataSize) {
+ this.dataSize = dataSize;
+ }
+
public long getRemoteDataSize() {
return remoteDataSize;
}
+ public void setRemoteDataSize(long remoteDataSize) {
+ this.remoteDataSize = remoteDataSize;
+ }
+
public long getRowCount() {
return rowCount;
}
+ public void setRowCount(long rowCount) {
+ this.rowCount = rowCount;
+ }
+
public long getLastFailedVersion() {
return lastFailedVersion;
}
@@ -321,28 +335,24 @@ public class Replica implements Writable {
this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd;
}
- // for compatibility
- public synchronized void updateStat(long dataSize, long rowNum) {
- this.dataSize = dataSize;
- this.rowCount = rowNum;
+ public void updateWithReport(TTabletInfo backendReplica) {
+ updateVersion(backendReplica.getVersion());
+ setDataSize(backendReplica.getDataSize());
+ setRemoteDataSize(backendReplica.getRemoteDataSize());
+ setRowCount(backendReplica.getRowCount());
+ setTotalVersionCount(backendReplica.getTotalVersionCount());
+ setVisibleVersionCount(
+ backendReplica.isSetVisibleVersionCount() ?
backendReplica.getVisibleVersionCount()
+ : backendReplica.getTotalVersionCount());
}
- public synchronized void updateStat(long dataSize, long remoteDataSize,
long rowNum, long versionCount) {
- this.dataSize = dataSize;
- this.remoteDataSize = remoteDataSize;
- this.rowCount = rowNum;
- this.versionCount = versionCount;
+ public synchronized void updateVersion(long newVersion) {
+ updateReplicaVersion(newVersion, this.lastFailedVersion,
this.lastSuccessVersion);
}
- public synchronized void updateVersionInfo(long newVersion, long
newDataSize, long newRemoteDataSize,
- long newRowCount) {
- updateReplicaInfo(newVersion, this.lastFailedVersion,
this.lastSuccessVersion, newDataSize, newRemoteDataSize,
- newRowCount);
- }
-
- public synchronized void updateVersionWithFailedInfo(
+ public synchronized void updateVersionWithFailed(
long newVersion, long lastFailedVersion, long lastSuccessVersion) {
- updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion,
dataSize, remoteDataSize, rowCount);
+ updateReplicaVersion(newVersion, lastFailedVersion,
lastSuccessVersion);
}
public synchronized void adminUpdateVersionInfo(Long version, Long
lastFailedVersion, Long lastSuccessVersion,
@@ -405,9 +415,7 @@ public class Replica implements Writable {
* the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash
is 0 or some unknown number.
* We just reset the LFV(hash) to recovery this replica.
*/
- private void updateReplicaInfo(long newVersion,
- long lastFailedVersion, long lastSuccessVersion,
- long newDataSize, long newRemoteDataSize, long newRowCount) {
+ private void updateReplicaVersion(long newVersion, long lastFailedVersion,
long lastSuccessVersion) {
if (LOG.isDebugEnabled()) {
LOG.debug("before update: {}", this.toString());
}
@@ -432,9 +440,6 @@ public class Replica implements Writable {
long oldLastFailedVersion = this.lastFailedVersion;
this.version = newVersion;
- this.dataSize = newDataSize;
- this.remoteDataSize = newRemoteDataSize;
- this.rowCount = newRowCount;
// just check it
if (lastSuccessVersion <= this.version) {
@@ -497,7 +502,7 @@ public class Replica implements Writable {
}
public synchronized void updateLastFailedVersion(long lastFailedVersion) {
- updateReplicaInfo(this.version, lastFailedVersion,
this.lastSuccessVersion, dataSize, remoteDataSize, rowCount);
+ updateReplicaVersion(this.version, lastFailedVersion,
this.lastSuccessVersion);
}
/*
@@ -542,16 +547,28 @@ public class Replica implements Writable {
return state == ReplicaState.COMPACTION_TOO_SLOW;
}
+ public boolean tooBigVersionCount() {
+ return visibleVersionCount >=
Config.min_version_count_indicate_replica_compaction_too_slow;
+ }
+
public boolean isNormal() {
return state == ReplicaState.NORMAL;
}
- public long getVersionCount() {
- return versionCount;
+ public long getTotalVersionCount() {
+ return totalVersionCount;
+ }
+
+ public void setTotalVersionCount(long totalVersionCount) {
+ this.totalVersionCount = totalVersionCount;
+ }
+
+ public long getVisibleVersionCount() {
+ return visibleVersionCount;
}
- public void setVersionCount(long versionCount) {
- this.versionCount = versionCount;
+ public void setVisibleVersionCount(long visibleVersionCount) {
+ this.visibleVersionCount = visibleVersionCount;
}
public boolean checkVersionRegressive(long newVersion) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 708334bb353..7a18da60ad4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -269,16 +269,16 @@ public class Tablet extends MetaObject implements
Writable {
}
if (Config.skip_compaction_slower_replica &&
allQueryableReplica.size() > 1) {
- long minVersionCount = Long.MAX_VALUE;
- for (Replica replica : allQueryableReplica) {
- if (replica.getVersionCount() != -1 &&
replica.getVersionCount() < minVersionCount) {
- minVersionCount = replica.getVersionCount();
- }
+ long minVersionCount =
allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount)
+ .filter(count -> count != -1).min().orElse(Long.MAX_VALUE);
+ long maxVersionCount =
Config.min_version_count_indicate_replica_compaction_too_slow;
+ if (minVersionCount != Long.MAX_VALUE) {
+ maxVersionCount = Math.max(maxVersionCount, minVersionCount *
QUERYABLE_TIMES_OF_MIN_VERSION_COUNT);
}
- final long finalMinVersionCount = minVersionCount;
- return allQueryableReplica.stream().filter(replica ->
replica.getVersionCount() == -1
- || replica.getVersionCount() <
Config.min_version_count_indicate_replica_compaction_too_slow
- || replica.getVersionCount() <
finalMinVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT)
+
+ final long finalMaxVersionCount = maxVersionCount;
+ return allQueryableReplica.stream()
+ .filter(replica -> replica.getVisibleVersionCount() <
finalMaxVersionCount)
.collect(Collectors.toList());
}
return allQueryableReplica;
@@ -508,7 +508,7 @@ public class Tablet extends MetaObject implements Writable {
if (versionCompleted) {
stable++;
- versions.add(replica.getVersionCount());
+ versions.add(replica.getVisibleVersionCount());
allocNum =
stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0);
stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum +
1));
@@ -599,7 +599,7 @@ public class Tablet extends MetaObject implements Writable {
// get the max version diff
long delta = versions.get(versions.size() - 1) - versions.get(0);
double ratio = (double) delta / versions.get(versions.size() - 1);
- if (versions.get(versions.size() - 1) >
Config.min_version_count_indicate_replica_compaction_too_slow
+ if (versions.get(versions.size() - 1) >=
Config.min_version_count_indicate_replica_compaction_too_slow
&& ratio >
Config.valid_version_count_delta_ratio_between_replicas) {
return Pair.of(TabletStatus.REPLICA_COMPACTION_TOO_SLOW,
Priority.HIGH);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 5fd7ae721f2..c46d2a9b99f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.cooldown.CooldownConf;
+import org.apache.doris.master.PartitionInfoCollector.PartitionCollectInfo;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TStorageMedium;
@@ -37,7 +38,7 @@ import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -96,7 +97,10 @@ public class TabletInvertedIndex {
// backend id -> (tablet id -> replica)
private Table<Long, Long, Replica> backingReplicaMetaTable =
HashBasedTable.create();
- private volatile ImmutableSet<Long> partitionIdInMemorySet =
ImmutableSet.of();
+ // partition id -> partition info.
+ // notice partition info update every
Config.partition_info_update_interval_secs seconds,
+ // so it may be stale.
+ private volatile ImmutableMap<Long, PartitionCollectInfo>
partitionCollectInfoMap = ImmutableMap.of();
private ForkJoinPool taskPool = new
ForkJoinPool(Runtime.getRuntime().availableProcessors());
@@ -120,11 +124,13 @@ public class TabletInvertedIndex {
}
public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
+ Map<Long, Long> backendPartitionsVersion,
final HashMap<Long, TStorageMedium>
storageMediumMap,
ListMultimap<Long, Long> tabletSyncMap,
ListMultimap<Long, Long> tabletDeleteFromMeta,
Set<Long> tabletFoundInMeta,
ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, Long> partitionVersionSyncMap,
Map<Long, ListMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
@@ -132,6 +138,7 @@ public class TabletInvertedIndex {
List<CooldownConf> cooldownConfToPush,
List<CooldownConf> cooldownConfToUpdate) {
List<Pair<TabletMeta, TTabletInfo>> cooldownTablets = new
ArrayList<>();
+ long feTabletNum = 0;
long stamp = readLock();
long start = System.currentTimeMillis();
try {
@@ -140,6 +147,7 @@ public class TabletInvertedIndex {
}
Map<Long, Replica> replicaMetaWithBackend =
backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
+ feTabletNum = replicaMetaWithBackend.size();
taskPool.submit(() -> {
// traverse replicas in meta with this backend
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
@@ -160,11 +168,13 @@ public class TabletInvertedIndex {
tabletMetaInfo = new TTabletMetaInfo();
tabletMetaInfo.setReplicaId(replica.getId());
}
- if (partitionIdInMemorySet.contains(
- backendTabletInfo.getPartitionId()) !=
backendTabletInfo.isIsInMemory()) {
+ PartitionCollectInfo partitionCollectInfo =
+
partitionCollectInfoMap.get(backendTabletInfo.getPartitionId());
+ boolean isInMemory = partitionCollectInfo != null
&& partitionCollectInfo.isInMemory();
+ if (isInMemory !=
backendTabletInfo.isIsInMemory()) {
if (tabletMetaInfo == null) {
tabletMetaInfo = new TTabletMetaInfo();
-
tabletMetaInfo.setIsInMemory(!backendTabletInfo.isIsInMemory());
+ tabletMetaInfo.setIsInMemory(isInMemory);
}
}
if (Config.fix_tablet_partition_id_eq_0
@@ -323,8 +333,11 @@ public class TabletInvertedIndex {
// update replicase's version count
// no need to write log, and no need to get db
lock.
- if (backendTabletInfo.isSetVersionCount()) {
-
replica.setVersionCount(backendTabletInfo.getVersionCount());
+ if (backendTabletInfo.isSetTotalVersionCount()) {
+
replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount());
+
replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount()
+ ?
backendTabletInfo.getVisibleVersionCount()
+ :
backendTabletInfo.getTotalVersionCount());
}
if (tabletMetaInfo != null) {
tabletMetaInfo.setTabletId(tabletId);
@@ -343,6 +356,15 @@ public class TabletInvertedIndex {
}
}
});
+
+
backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> {
+ long partitionId = entry.getKey();
+ long backendVersion = entry.getValue();
+ PartitionCollectInfo partitionInfo =
partitionCollectInfoMap.get(partitionId);
+ if (partitionInfo != null &&
partitionInfo.getVisibleVersion() > backendVersion) {
+ partitionVersionSyncMap.put(partitionId,
partitionInfo.getVisibleVersion());
+ }
+ });
}).join();
}
} finally {
@@ -351,11 +373,13 @@ public class TabletInvertedIndex {
cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second,
cooldownConfToPush, cooldownConfToUpdate));
long end = System.currentTimeMillis();
- LOG.info("finished to do tablet diff with backend[{}]. sync: {}."
- + " metaDel: {}. foundInMeta: {}. migration: {}. "
- + "found invalid transactions {}. found republish
transactions {}. tabletToUpdate: {}."
- + " need recovery: {}. cost: {} ms", backendId,
tabletSyncMap.size(),
+ LOG.info("finished to do tablet diff with backend[{}]. fe tablet num:
{}, backend tablet num: {}. sync: {}."
+ + " metaDel: {}. foundInMeta: {}. migration: {}.
backend partition num: {}, backend need "
+ + "update: {}. found invalid transactions {}. found
republish "
+ + "transactions {}. tabletToUpdate: {}. need recovery:
{}. cost: {} ms",
+ backendId, feTabletNum, backendTablets.size(),
tabletSyncMap.size(),
tabletDeleteFromMeta.size(), tabletFoundInMeta.size(),
tabletMigrationMap.size(),
+ backendPartitionsVersion.size(),
partitionVersionSyncMap.size(),
transactionsToClear.size(), transactionsToPublish.size(),
tabletToUpdate.size(),
tabletRecoveryMap.size(), (end - start));
}
@@ -757,8 +781,8 @@ public class TabletInvertedIndex {
}
}
- public void setPartitionIdInMemorySet(ImmutableSet<Long>
partitionIdInMemorySet) {
- this.partitionIdInMemorySet = partitionIdInMemorySet;
+ public void setPartitionCollectInfoMap(ImmutableMap<Long,
PartitionCollectInfo> partitionCollectInfoMap) {
+ this.partitionCollectInfoMap = partitionCollectInfoMap;
}
public Map<Long, Long> getReplicaToTabletMap() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index e2b65a95f6e..896ecac6f8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -151,8 +151,12 @@ public class TabletStatMgr extends MasterDaemon {
if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) {
Replica replica =
invertedIndex.getReplica(stat.getTabletId(), beId);
if (replica != null) {
- replica.updateStat(stat.getDataSize(),
stat.getRemoteDataSize(), stat.getRowNum(),
- stat.getVersionCount());
+ replica.setDataSize(stat.getDataSize());
+ replica.setRemoteDataSize(stat.getRemoteDataSize());
+ replica.setRowCount(stat.getRowCount());
+
replica.setTotalVersionCount(stat.getTotalVersionCount());
+
replica.setVisibleVersionCount(stat.isSetVisibleVersionCount() ?
stat.getVisibleVersionCount()
+ : stat.getTotalVersionCount());
}
}
}
@@ -168,7 +172,8 @@ public class TabletStatMgr extends MasterDaemon {
continue;
}
// TODO(cmy) no db lock protected. I think it is ok even we
get wrong row num
- replica.updateStat(entry.getValue().getDataSize(),
entry.getValue().getRowNum());
+ replica.setDataSize(entry.getValue().getDataSize());
+ replica.setRowCount(entry.getValue().getRowCount());
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 8401ec17bbd..e676774afcf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -574,15 +574,15 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
Replica chosenReplica = null;
long maxVersionCount = Integer.MIN_VALUE;
for (Replica replica : tablet.getReplicas()) {
- if (replica.getVersionCount() > maxVersionCount) {
- maxVersionCount = replica.getVersionCount();
+ if (replica.getVisibleVersionCount() > maxVersionCount) {
+ maxVersionCount = replica.getVisibleVersionCount();
chosenReplica = replica;
}
}
boolean recovered = false;
for (Replica replica : tablet.getReplicas()) {
if (replica.isAlive() && replica.tooSlow() &&
(!replica.equals(chosenReplica)
- || replica.getVersionCount() <
Config.min_version_count_indicate_replica_compaction_too_slow)) {
+ || !replica.tooBigVersionCount())) {
if (chosenReplica != null) {
chosenReplica.setState(ReplicaState.NORMAL);
recovered = true;
@@ -1177,8 +1177,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
"replica does not exist. backend id: " +
destBackendId);
}
- replica.updateVersionInfo(reportedTablet.getVersion(),
reportedTablet.getDataSize(),
- reportedTablet.getRemoteDataSize(),
reportedTablet.getRowCount());
+ replica.updateWithReport(reportedTablet);
if (replica.getLastFailedVersion() >
partition.getCommittedVersion()
&& reportedTablet.getVersion() >=
partition.getCommittedVersion()
//&& !(reportedTablet.isSetVersionMiss() &&
reportedTablet.isVersionMiss()
@@ -1365,8 +1364,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
public static class VersionCountComparator implements Comparator<Replica> {
@Override
public int compare(Replica r1, Replica r2) {
- long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE :
r1.getVersionCount();
- long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE :
r2.getVersionCount();
+ long verCount1 = r1.getVisibleVersionCount() == -1 ?
Long.MAX_VALUE : r1.getVisibleVersionCount();
+ long verCount2 = r2.getVisibleVersionCount() == -1 ?
Long.MAX_VALUE : r2.getVisibleVersionCount();
if (verCount1 < verCount2) {
return -1;
} else if (verCount1 > verCount2) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 357d63f0d56..3d4110054ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1110,13 +1110,13 @@ public class TabletScheduler extends MasterDaemon {
if (replica.isAlive() && !replica.tooSlow()) {
normalReplicaCount++;
}
- if (replica.getVersionCount() > maxVersionCount) {
- maxVersionCount = replica.getVersionCount();
+ if (replica.getVisibleVersionCount() > maxVersionCount) {
+ maxVersionCount = replica.getVisibleVersionCount();
chosenReplica = replica;
}
}
if (chosenReplica != null && chosenReplica.isAlive() &&
!chosenReplica.tooSlow()
- && chosenReplica.getVersionCount() >
Config.min_version_count_indicate_replica_compaction_too_slow
+ && chosenReplica.tooBigVersionCount()
&& normalReplicaCount - 1 >= tabletCtx.getReplicas().size() /
2 + 1) {
chosenReplica.setState(ReplicaState.COMPACTION_TOO_SLOW);
LOG.info("set replica id :{} tablet id: {}, backend id: {} to
COMPACTION_TOO_SLOW",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
index 9b2a8f6d8f7..edf2a9d3517 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
@@ -44,7 +44,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
.add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
.add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad")
.add("IsUserDrop")
- .add("VersionCount").add("PathHash").add("Path")
+
.add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId")
.add("CooldownMetaId").add("QueryHits").build();
@@ -117,7 +117,8 @@ public class ReplicasProcNode implements ProcNodeInterface {
String.valueOf(replica.getState()),
String.valueOf(replica.isBad()),
String.valueOf(replica.isUserDrop()),
-
String.valueOf(replica.getVersionCount()),
+
String.valueOf(replica.getVisibleVersionCount()),
+
String.valueOf(replica.getTotalVersionCount()),
String.valueOf(replica.getPathHash()),
path,
metaUrl,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index ca50c418305..e28c74c327e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -261,8 +261,7 @@ public class TabletHealthProcDir implements
ProcDirInterface {
oversizeTabletIds.add(tablet.getId());
}
for (Replica replica : tablet.getReplicas()) {
- if (replica.getVersionCount()
- >
Config.min_version_count_indicate_replica_compaction_too_slow) {
+ if (replica.tooBigVersionCount()) {
replicaCompactionTooSlowNum++;
replicaCompactionTooSlowTabletIds.add(tablet.getId());
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
index c82a55bd380..46c89eb3253 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
@@ -52,9 +52,10 @@ public class TabletsProcDir implements ProcDirInterface {
.add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
.add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State")
.add("LstConsistencyCheckTime").add("CheckVersion")
- .add("VersionCount").add("QueryHits").add("PathHash").add("Path")
+
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus")
- .add("CooldownReplicaId").add("CooldownMetaId").build();
+ .add("CooldownReplicaId").add("CooldownMetaId")
+ .build();
private Table table;
private MaterializedIndex index;
@@ -113,7 +114,8 @@ public class TabletsProcDir implements ProcDirInterface {
tabletInfo.add(-1); // lst consistency check time
tabletInfo.add(-1); // check version
tabletInfo.add(-1); // check version hash
- tabletInfo.add(-1); // version count
+ tabletInfo.add(-1); // visible version count
+ tabletInfo.add(-1); // total version count
tabletInfo.add(0L); // query hits
tabletInfo.add(-1); // path hash
tabletInfo.add(FeConstants.null_string); // path
@@ -147,7 +149,8 @@ public class TabletsProcDir implements ProcDirInterface {
tabletInfo.add(TimeUtils.longToTimeString(tablet.getLastCheckTime()));
tabletInfo.add(tablet.getCheckedVersion());
- tabletInfo.add(replica.getVersionCount());
+ tabletInfo.add(replica.getVisibleVersionCount());
+ tabletInfo.add(replica.getTotalVersionCount());
tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L));
tabletInfo.add(replica.getPathHash());
tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), ""));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index c8f52d5b4b2..b6c0c73eae2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1036,7 +1036,10 @@ public class InternalCatalog implements
CatalogIf<Database> {
Tablet tablet = materializedIndex.getTablet(info.getTabletId());
Replica replica = tablet.getReplicaById(info.getReplicaId());
Preconditions.checkNotNull(replica, info);
- replica.updateVersionInfo(info.getVersion(), info.getDataSize(),
info.getRemoteDataSize(), info.getRowCount());
+ replica.updateVersion(info.getVersion());
+ replica.setDataSize(info.getDataSize());
+ replica.setRemoteDataSize(info.getRemoteDataSize());
+ replica.setRowCount(info.getRowCount());
replica.setBad(false);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java
similarity index 68%
rename from
fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java
rename to
fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java
index 77ed5829799..f4bf87ad109 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java
@@ -27,29 +27,49 @@ import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
-public class PartitionInMemoryInfoCollector extends MasterDaemon {
+public class PartitionInfoCollector extends MasterDaemon {
- private static final Logger LOG =
LogManager.getLogger(PartitionInMemoryInfoCollector.class);
+ // notice since collect partition info every
Config.partition_info_update_interval_secs seconds,
+ // so partition collect info may be stale
+ public static class PartitionCollectInfo {
+ private long visibleVersion;
+ private boolean isInMemory;
- public PartitionInMemoryInfoCollector() {
- super("PartitionInMemoryInfoCollector",
Config.partition_in_memory_update_interval_secs * 1000);
+ PartitionCollectInfo(long visibleVersion, boolean isInMemory) {
+ this.visibleVersion = visibleVersion;
+ this.isInMemory = isInMemory;
+ }
+
+ public long getVisibleVersion() {
+ return this.visibleVersion;
+ }
+
+ public boolean isInMemory() {
+ return this.isInMemory;
+ }
+ }
+
+ private static final Logger LOG =
LogManager.getLogger(PartitionInfoCollector.class);
+
+ public PartitionInfoCollector() {
+ super("PartitionInfoCollector",
Config.partition_info_update_interval_secs * 1000);
}
@Override
protected void runAfterCatalogReady() {
- updatePartitionInMemoryInfo();
+ updatePartitionCollectInfo();
}
- private void updatePartitionInMemoryInfo() {
+ private void updatePartitionCollectInfo() {
Env env = Env.getCurrentEnv();
TabletInvertedIndex tabletInvertedIndex = env.getTabletInvertedIndex();
- ImmutableSet.Builder builder = ImmutableSet.builder();
+ ImmutableMap.Builder builder = ImmutableMap.builder();
List<Long> dbIdList = env.getInternalCatalog().getDbIds();
for (Long dbId : dbIdList) {
Database db = env.getInternalCatalog().getDbNullable(dbId);
@@ -70,10 +90,12 @@ public class PartitionInMemoryInfoCollector extends
MasterDaemon {
try {
OlapTable olapTable = (OlapTable) table;
for (Partition partition :
olapTable.getAllPartitions()) {
- if
(olapTable.getPartitionInfo().getIsInMemory(partition.getId())) {
+ boolean isInMemory =
olapTable.getPartitionInfo().getIsInMemory(partition.getId());
+ if (isInMemory) {
partitionInMemoryCount++;
- builder.add(partition.getId());
}
+ builder.put(partition.getId(),
+ new
PartitionCollectInfo(partition.getVisibleVersion(), isInMemory));
}
} finally {
table.readUnlock();
@@ -87,9 +109,6 @@ public class PartitionInMemoryInfoCollector extends
MasterDaemon {
LOG.warn("Update database[" + db.getFullName() + "] partition
in memory info failed", e);
}
}
- ImmutableSet<Long> partitionIdInMemorySet = builder.build();
- tabletInvertedIndex.setPartitionIdInMemorySet(partitionIdInMemorySet);
+ tabletInvertedIndex.setPartitionCollectInfoMap(builder.build());
}
-
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 7c77c1dfad3..635a8bb675f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -71,6 +71,7 @@ import org.apache.doris.task.PushCooldownConfTask;
import org.apache.doris.task.PushStoragePolicyTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
+import org.apache.doris.task.UpdateVisibleVersionTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TMasterResult;
@@ -155,6 +156,7 @@ public class ReportHandler extends Daemon {
Map<TTaskType, Set<Long>> tasks = null;
Map<String, TDisk> disks = null;
Map<Long, TTablet> tablets = null;
+ Map<Long, Long> partitionsVersion = null;
long reportVersion = -1;
ReportType reportType = ReportType.UNKNOWN;
@@ -180,11 +182,15 @@ public class ReportHandler extends Daemon {
reportType = ReportType.TABLET;
}
+ if (request.isSetPartitionsVersion()) {
+ partitionsVersion = request.getPartitionsVersion();
+ }
+
if (request.isSetTabletMaxCompactionScore()) {
backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore());
}
- ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets,
reportVersion,
+ ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets,
partitionsVersion, reportVersion,
request.getStoragePolicy(), request.getResource(),
request.getNumCores(),
request.getPipelineExecutorSize());
try {
@@ -231,6 +237,7 @@ public class ReportHandler extends Daemon {
private Map<TTaskType, Set<Long>> tasks;
private Map<String, TDisk> disks;
private Map<Long, TTablet> tablets;
+ private Map<Long, Long> partitionsVersion;
private long reportVersion;
private List<TStoragePolicy> storagePolicies;
@@ -239,14 +246,15 @@ public class ReportHandler extends Daemon {
private int pipelineExecutorSize;
public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
- Map<String, TDisk> disks,
- Map<Long, TTablet> tablets, long reportVersion,
+ Map<String, TDisk> disks, Map<Long, TTablet> tablets,
+ Map<Long, Long> partitionsVersion, long reportVersion,
List<TStoragePolicy> storagePolicies, List<TStorageResource>
storageResources, int cpuCores,
int pipelineExecutorSize) {
this.beId = beId;
this.tasks = tasks;
this.disks = disks;
this.tablets = tablets;
+ this.partitionsVersion = partitionsVersion;
this.reportVersion = reportVersion;
this.storagePolicies = storagePolicies;
this.storageResources = storageResources;
@@ -273,7 +281,11 @@ public class ReportHandler extends Daemon {
LOG.warn("out of date report version {} from backend[{}].
current report version[{}]",
reportVersion, beId, backendReportVersion);
} else {
- ReportHandler.tabletReport(beId, tablets, reportVersion);
+ Map<Long, Long> partitions = this.partitionsVersion;
+ if (partitions == null) {
+ partitions = Maps.newHashMap();
+ }
+ ReportHandler.tabletReport(beId, tablets, partitions,
reportVersion);
}
}
}
@@ -408,7 +420,8 @@ public class ReportHandler extends Daemon {
}
// public for fe ut
- public static void tabletReport(long backendId, Map<Long, TTablet>
backendTablets, long backendReportVersion) {
+ public static void tabletReport(long backendId, Map<Long, TTablet>
backendTablets,
+ Map<Long, Long> backendPartitionsVersion, long
backendReportVersion) {
long start = System.currentTimeMillis();
LOG.info("backend[{}] reports {} tablet(s). report version: {}",
backendId, backendTablets.size(), backendReportVersion);
@@ -426,6 +439,9 @@ public class ReportHandler extends Daemon {
// storage medium -> tablet id
ListMultimap<TStorageMedium, Long> tabletMigrationMap =
LinkedListMultimap.create();
+ // partition id -> visible version
+ Map<Long, Long> partitionVersionSyncMap = Maps.newConcurrentMap();
+
// dbid -> txn id -> [partition info]
Map<Long, ListMultimap<Long, TPartitionVersionInfo>>
transactionsToPublish = Maps.newHashMap();
ListMultimap<Long, Long> transactionsToClear =
LinkedListMultimap.create();
@@ -439,11 +455,13 @@ public class ReportHandler extends Daemon {
List<CooldownConf> cooldownConfToUpdate = new LinkedList<>();
// 1. do the diff. find out (intersection) / (be - meta) / (meta - be)
- Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets,
storageMediumMap,
+ Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets,
backendPartitionsVersion,
+ storageMediumMap,
tabletSyncMap,
tabletDeleteFromMeta,
tabletFoundInMeta,
tabletMigrationMap,
+ partitionVersionSyncMap,
transactionsToPublish,
transactionsToClear,
tabletRecoveryMap,
@@ -499,6 +517,9 @@ public class ReportHandler extends Daemon {
if (!cooldownConfToUpdate.isEmpty()) {
Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate);
}
+ if (!partitionVersionSyncMap.isEmpty()) {
+ handleUpdatePartitionVersion(partitionVersionSyncMap, backendId);
+ }
final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
Backend reportBackend = currentSystemInfo.getBackend(backendId);
@@ -659,27 +680,17 @@ public class ReportHandler extends Daemon {
// if the last failed version is changed, then fe will
think schema change successfully.
// this is an fatal error.
if (replica.getState() == ReplicaState.NORMAL) {
- long metaVersion = replica.getVersion();
- long backendVersion = -1L;
- long rowCount = -1L;
- long dataSize = -1L;
- long remoteDataSize = -1L;
// schema change maybe successfully in fe, but not
inform be,
// then be will report two schema hash
// just select the dest schema hash
- for (TTabletInfo tabletInfo :
backendTablets.get(tabletId).getTabletInfos()) {
- if (tabletInfo.getSchemaHash() == schemaHash) {
- backendVersion = tabletInfo.getVersion();
- rowCount = tabletInfo.getRowCount();
- dataSize = tabletInfo.getDataSize();
- remoteDataSize =
tabletInfo.getRemoteDataSize();
- break;
- }
- }
- if (backendVersion == -1L) {
+ TTabletInfo tabletInfo =
backendTablets.get(tabletId).getTabletInfos().stream()
+ .filter(t -> t.getSchemaHash() ==
schemaHash).findFirst().orElse(null);
+ if (tabletInfo == null) {
continue;
}
+ long metaVersion = replica.getVersion();
+ long backendVersion = tabletInfo.getVersion();
boolean needSync = false;
if (metaVersion < backendVersion) {
needSync = true;
@@ -702,7 +713,7 @@ public class ReportHandler extends Daemon {
// happens when
// 1. PUSH finished in BE but failed or not yet
report to FE
// 2. repair for VERSION_INCOMPLETE finished in
BE, but failed or not yet report to FE
- replica.updateVersionInfo(backendVersion,
dataSize, remoteDataSize, rowCount);
+ replica.updateWithReport(tabletInfo);
if (replica.getLastFailedVersion() < 0) {
if (replica.setBad(false)) {
@@ -716,7 +727,9 @@ public class ReportHandler extends Daemon {
ReplicaPersistInfo info =
ReplicaPersistInfo.createForClone(dbId, tableId,
partitionId, indexId, tabletId,
backendId, replica.getId(),
replica.getVersion(), schemaHash,
- dataSize, remoteDataSize, rowCount,
+ tabletInfo.getDataSize(),
+ tabletInfo.getRemoteDataSize(),
+ tabletInfo.getRowCount(),
replica.getLastFailedVersion(),
replica.getLastSuccessVersion());
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
@@ -1033,6 +1046,14 @@ public class ReportHandler extends Daemon {
AgentTaskExecutor.submit(batchTask);
}
+ private static void handleUpdatePartitionVersion(Map<Long, Long>
partitionVersionSyncMap, long backendId) {
+ AgentBatchTask batchTask = new AgentBatchTask();
+ UpdateVisibleVersionTask task = new
UpdateVisibleVersionTask(backendId, partitionVersionSyncMap,
+ System.currentTimeMillis());
+ batchTask.addTask(task);
+ AgentTaskExecutor.submit(batchTask);
+ }
+
private static void handleRecoverTablet(ListMultimap<Long, Long>
tabletRecoveryMap,
Map<Long, TTablet> backendTablets,
long backendId) {
// print a warn log here to indicate the exceptions on the backend
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
index 2b084fcfa23..5e7748a3524 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
@@ -26,7 +26,6 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
-import org.apache.doris.common.Config;
import com.google.common.collect.Lists;
import org.json.simple.JSONObject;
@@ -154,9 +153,9 @@ public class Diagnoser {
+ ", and is bad: " + (replica.isBad() ? "Yes" : "No")
+ ", and is going to drop: " + (replica.isUserDrop() ?
"Yes" : "No"));
}
- if (replica.getVersionCount() >
Config.min_version_count_indicate_replica_compaction_too_slow) {
+ if (replica.tooBigVersionCount()) {
compactionErr.append("Replica on backend " +
replica.getBackendId() + "'s version count is too high: "
- + replica.getVersionCount());
+ + replica.getVisibleVersionCount());
}
}
results.add(Lists.newArrayList("ReplicaBackendStatus",
(backendErr.length() == 0
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 03a82cbb56b..360d24c573c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -48,6 +48,7 @@ import org.apache.doris.thrift.TStorageMediumMigrateReq;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUpdateTabletMetaInfoReq;
import org.apache.doris.thrift.TUploadReq;
+import org.apache.doris.thrift.TVisibleVersionReq;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
@@ -402,6 +403,15 @@ public class AgentBatchTask implements Runnable {
tAgentTaskRequest.setCleanTrashReq(request);
return tAgentTaskRequest;
}
+ case UPDATE_VISIBLE_VERSION: {
+ UpdateVisibleVersionTask visibleTask =
(UpdateVisibleVersionTask) task;
+ TVisibleVersionReq request = visibleTask.toThrift();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(request.toString());
+ }
+ tAgentTaskRequest.setVisibleVersionReq(request);
+ return tAgentTaskRequest;
+ }
default:
if (LOG.isDebugEnabled()) {
LOG.debug("could not find task type for task [{}]", task);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java
new file mode 100644
index 00000000000..52ed3b1c490
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.thrift.TTaskType;
+import org.apache.doris.thrift.TVisibleVersionReq;
+
+import java.util.Map;
+
+public class UpdateVisibleVersionTask extends AgentTask {
+ private Map<Long, Long> partitionVisibleVersions;
+
+ public UpdateVisibleVersionTask(long backendId, Map<Long, Long>
partitionVisibleVersions, long createTime) {
+ super(null, backendId, TTaskType.UPDATE_VISIBLE_VERSION, -1L, -1L,
-1L, -1L, -1L, -1L, createTime);
+ this.partitionVisibleVersions = partitionVisibleVersions;
+ }
+
+ public TVisibleVersionReq toThrift() {
+ TVisibleVersionReq request = new TVisibleVersionReq();
+ partitionVisibleVersions.forEach((partitionId, version) -> {
+ request.putToPartitionVersion(partitionId, version);
+ });
+ return request;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index efcc760f0d2..3996664708a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -44,6 +44,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.common.util.MetaLockUtils;
@@ -986,7 +987,12 @@ public class DatabaseTransactionMgr {
}
}
- public void finishTransaction(long transactionId) throws UserException {
+ public void finishTransaction(long transactionId, Map<Long, Long>
partitionVisibleVersions,
+ Map<Long, Set<Long>> backendPartitions) throws UserException {
+ if
(DebugPointUtil.isEnable("DatabaseTransactionMgr.stop_finish_transaction")) {
+ return;
+ }
+
TransactionState transactionState = null;
readLock();
try {
@@ -1049,7 +1055,7 @@ public class DatabaseTransactionMgr {
LOG.warn("afterStateTransform txn {} failed. exception: ",
transactionState, e);
}
}
- updateCatalogAfterVisible(transactionState, db);
+ updateCatalogAfterVisible(transactionState, db,
partitionVisibleVersions, backendPartitions);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@@ -1964,7 +1970,8 @@ public class DatabaseTransactionMgr {
}
}
- private boolean updateCatalogAfterVisible(TransactionState
transactionState, Database db) {
+ private boolean updateCatalogAfterVisible(TransactionState
transactionState, Database db,
+ Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>>
backendPartitions) {
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
List<Long> newPartitionLoadedTableIds = new ArrayList<>();
@@ -2021,7 +2028,13 @@ public class DatabaseTransactionMgr {
lastFailedVersion = newCommitVersion;
}
}
- replica.updateVersionWithFailedInfo(newVersion,
lastFailedVersion, lastSuccessVersion);
+ replica.updateVersionWithFailed(newVersion,
lastFailedVersion, lastSuccessVersion);
+ Set<Long> partitionIds =
backendPartitions.get(replica.getBackendId());
+ if (partitionIds == null) {
+ partitionIds = Sets.newHashSet();
+ backendPartitions.put(replica.getBackendId(),
partitionIds);
+ }
+ partitionIds.add(partitionId);
}
}
} // end for indices
@@ -2032,6 +2045,7 @@ public class DatabaseTransactionMgr {
newPartitionLoadedTableIds.add(tableId);
}
partition.updateVisibleVersionAndTime(version, versionTime);
+ partitionVisibleVersions.put(partition.getId(), version);
if (LOG.isDebugEnabled()) {
LOG.debug("transaction state {} set partition {}'s version
to [{}]",
transactionState, partition.getId(), version);
@@ -2180,7 +2194,7 @@ public class DatabaseTransactionMgr {
if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
updateCatalogAfterCommitted(transactionState, db, true);
} else if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- updateCatalogAfterVisible(transactionState, db);
+ updateCatalogAfterVisible(transactionState, db,
Maps.newHashMap(), Maps.newHashMap());
}
unprotectUpsertTransactionState(transactionState, true);
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index dd62c02f408..24d92849b49 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -414,9 +414,10 @@ public class GlobalTransactionMgr implements Writable {
* @param transactionId
* @return
*/
- public void finishTransaction(long dbId, long transactionId) throws
UserException {
+ public void finishTransaction(long dbId, long transactionId, Map<Long,
Long> partitionVisibleVersions,
+ Map<Long, Set<Long>> backendPartitions) throws UserException {
DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
- dbTransactionMgr.finishTransaction(transactionId);
+ dbTransactionMgr.finishTransaction(transactionId,
partitionVisibleVersions, backendPartitions);
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index a6665c2e220..22ca57f2399 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -32,6 +32,7 @@ import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.task.UpdateVisibleVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TTaskType;
@@ -61,14 +62,18 @@ public class PublishVersionDaemon extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
+ Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+ Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+
try {
- publishVersion();
+ publishVersion(partitionVisibleVersions, backendPartitions);
+ sendBackendVisibleVersion(partitionVisibleVersions,
backendPartitions);
} catch (Throwable t) {
LOG.error("errors while publish version to all backends", t);
}
}
- private void publishVersion() {
+ private void publishVersion(Map<Long, Long> partitionVisibleVersions,
Map<Long, Set<Long>> backendPartitions) {
if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) {
return;
}
@@ -177,7 +182,7 @@ public class PublishVersionDaemon extends MasterDaemon {
try {
// one transaction exception should not affect other
transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
- transactionState.getTransactionId());
+ transactionState.getTransactionId(),
partitionVisibleVersions, backendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
}
@@ -232,4 +237,23 @@ public class PublishVersionDaemon extends MasterDaemon {
.collect(Collectors.groupingBy(p -> p.first,
Collectors.mapping(p -> p.second,
Collectors.toSet())));
}
+
+ private void sendBackendVisibleVersion(Map<Long, Long>
partitionVisibleVersions,
+ Map<Long, Set<Long>> backendPartitions) {
+ if (partitionVisibleVersions.isEmpty() || backendPartitions.isEmpty())
{
+ return;
+ }
+
+ long createTime = System.currentTimeMillis();
+ AgentBatchTask batchTask = new AgentBatchTask();
+ backendPartitions.forEach((backendId, partitionIds) -> {
+ Map<Long, Long> backendPartitionVersions =
partitionVisibleVersions.entrySet().stream()
+ .filter(entry -> partitionIds.contains(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ UpdateVisibleVersionTask task = new
UpdateVisibleVersionTask(backendId, backendPartitionVersions,
+ createTime);
+ batchTask.addTask(task);
+ });
+ AgentTaskExecutor.submit(batchTask);
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
index e93f524a919..143dd920397 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
@@ -221,10 +221,7 @@ public class RollupJobV2Test {
MaterializedIndex shadowIndex =
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
-
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
- shadowReplica.getDataSize(),
- shadowReplica.getRemoteDataSize(),
- shadowReplica.getRowCount());
+ shadowReplica.updateVersion(testPartition.getVisibleVersion());
}
}
@@ -301,10 +298,7 @@ public class RollupJobV2Test {
MaterializedIndex shadowIndex =
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
-
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
- shadowReplica.getDataSize(),
- shadowReplica.getRemoteDataSize(),
- shadowReplica.getRowCount());
+ shadowReplica.updateVersion(testPartition.getVisibleVersion());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index 60020d08a8c..7a49a413750 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -213,8 +213,7 @@ public class SchemaChangeJobV2Test {
MaterializedIndex shadowIndex =
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
-
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
shadowReplica.getDataSize(),
- shadowReplica.getRemoteDataSize(),
shadowReplica.getRowCount());
+ shadowReplica.updateVersion(testPartition.getVisibleVersion());
}
}
@@ -296,8 +295,7 @@ public class SchemaChangeJobV2Test {
MaterializedIndex shadowIndex =
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
for (Tablet shadowTablet : shadowIndex.getTablets()) {
for (Replica shadowReplica : shadowTablet.getReplicas()) {
-
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
shadowReplica.getDataSize(),
- shadowReplica.getRemoteDataSize(),
shadowReplica.getRowCount());
+ shadowReplica.updateVersion(testPartition.getVisibleVersion());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java
index 21fe967a96d..54debab9a63 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java
@@ -73,7 +73,8 @@ public class ShowReplicaTest extends TestWithFeService {
for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- replica.updateStat(1024, 2);
+ replica.setDataSize(1024L);
+ replica.setRowCount(2L);
}
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java
index d6a81cdd883..eb7dbca0775 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java
@@ -65,12 +65,7 @@ public class ReplicaTest {
// update new version
long newVersion = version + 1;
- long newDataSize = dataSize + 100;
- long newRowCount = rowCount + 10;
- replica.updateVersionInfo(newVersion, newDataSize, 0, newRowCount);
- Assert.assertEquals(newVersion, replica.getVersion());
- Assert.assertEquals(newDataSize, replica.getDataSize());
- Assert.assertEquals(newRowCount, replica.getRowCount());
+ replica.updateVersion(newVersion);
// check version catch up
Assert.assertFalse(replica.checkVersionCatchUp(5, false));
@@ -132,14 +127,14 @@ public class ReplicaTest {
public void testUpdateVersion1() {
Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78,
ReplicaState.NORMAL, 0, 3);
// new version is little than original version, it is invalid the
version will not update
- originalReplica.updateVersionInfo(2, 100, 0, 78);
+ originalReplica.updateVersion(2);
Assert.assertEquals(3, originalReplica.getVersion());
}
@Test
public void testUpdateVersion2() {
Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78,
ReplicaState.NORMAL, 0, 0);
- originalReplica.updateVersionInfo(3, 100, 0, 78);
+ originalReplica.updateVersion(3);
// if new version >= current version and last success version <= new
version, then last success version should be updated
Assert.assertEquals(3, originalReplica.getLastSuccessVersion());
Assert.assertEquals(3, originalReplica.getVersion());
@@ -155,7 +150,7 @@ public class ReplicaTest {
Assert.assertEquals(8, originalReplica.getLastFailedVersion());
// update last success version 10
-
originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(),
+ originalReplica.updateVersionWithFailed(originalReplica.getVersion(),
originalReplica.getLastFailedVersion(),
10);
Assert.assertEquals(10, originalReplica.getLastSuccessVersion());
@@ -163,7 +158,7 @@ public class ReplicaTest {
Assert.assertEquals(8, originalReplica.getLastFailedVersion());
// update version to 8, the last success version and version should be
10
- originalReplica.updateVersionInfo(8, 100, 0, 78);
+ originalReplica.updateVersion(8);
Assert.assertEquals(10, originalReplica.getLastSuccessVersion());
Assert.assertEquals(10, originalReplica.getVersion());
Assert.assertEquals(-1, originalReplica.getLastFailedVersion());
@@ -175,7 +170,7 @@ public class ReplicaTest {
Assert.assertEquals(12, originalReplica.getLastFailedVersion());
// update last success version to 15
-
originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(),
+ originalReplica.updateVersionWithFailed(originalReplica.getVersion(),
originalReplica.getLastFailedVersion(),
15);
Assert.assertEquals(15, originalReplica.getLastSuccessVersion());
@@ -189,13 +184,13 @@ public class ReplicaTest {
Assert.assertEquals(18, originalReplica.getLastFailedVersion());
// update version to 17 then version and success version is 17
- originalReplica.updateVersionInfo(17, 100, 0, 78);
+ originalReplica.updateVersion(17);
Assert.assertEquals(17, originalReplica.getLastSuccessVersion());
Assert.assertEquals(17, originalReplica.getVersion());
Assert.assertEquals(18, originalReplica.getLastFailedVersion());
// update version to 18, then version and last success version should
be 18 and failed version should be -1
- originalReplica.updateVersionInfo(18, 100, 0, 78);
+ originalReplica.updateVersion(18);
Assert.assertEquals(18, originalReplica.getLastSuccessVersion());
Assert.assertEquals(18, originalReplica.getVersion());
Assert.assertEquals(-1, originalReplica.getLastFailedVersion());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
index f563b846ed7..40b6683da3d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
@@ -110,7 +110,8 @@ public class DiskReblanceWhenSchedulerIdle extends
TestWithFeService {
Lists.newArrayList(tablet.getReplicas()).forEach(
replica -> {
if (replica.getBackendId() == backends.get(1).getId()) {
- replica.updateStat(totalCapacity / 4, 1);
+ replica.setDataSize(totalCapacity / 4);
+ replica.setRowCount(1);
tablet.deleteReplica(replica);
replica.setBackendId(backends.get(0).getId());
replica.setPathHash(diskInfo0.getPathHash());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 063faf2d3b2..1e6af5c7324 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -104,7 +104,7 @@ public class RebalancerTestUtil {
replica.setPathHash(beIds.get(i));
if (replicaSizes != null) {
// for disk rebalancer, every beId corresponding to a
replicaSize
- replica.updateStat(replicaSizes.get(i), 0);
+ replica.setDataSize(replicaSizes.get(i));
}
// isRestore set true, to avoid modifying
Catalog.getCurrentInvertedIndex
tablet.addReplica(replica, true);
@@ -165,7 +165,8 @@ public class RebalancerTestUtil {
for (Tablet tablet : idx.getTablets()) {
long tabletSize = tableBaseSize * (1 +
random.nextInt(tabletSkew));
for (Replica replica : tablet.getReplicas()) {
- replica.updateStat(tabletSize, 1000L);
+ replica.setDataSize(tabletSize);
+ replica.setRowCount(1000L);
}
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
index 1150b32192f..1ac497dbebe 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
@@ -106,7 +106,7 @@ public class RepairVersionTest extends TestWithFeService {
tablets.put(tablet.getId(), tTablet);
Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
- ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+ ReportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L);
Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
@@ -136,11 +136,11 @@ public class RepairVersionTest extends TestWithFeService {
Map<Long, TTablet> tablets = Maps.newHashMap();
tablets.put(tablet.getId(), tTablet);
- ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+ ReportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L);
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately",
new DebugPoint());
- ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+ ReportHandler.tabletReport(replica.getBackendId(), tablets,
Maps.newHashMap(), 100L);
Assertions.assertEquals(replica.getVersion() + 1,
replica.getLastFailedVersion());
Assertions.assertEquals(partition.getVisibleVersion(),
replica.getVersion());
@@ -160,7 +160,7 @@ public class RepairVersionTest extends TestWithFeService {
long visibleVersion = 2L;
partition.updateVisibleVersion(visibleVersion);
partition.setNextVersion(visibleVersion + 1);
- tablet.getReplicas().forEach(replica ->
replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L));
+ tablet.getReplicas().forEach(replica ->
replica.updateVersion(visibleVersion));
Replica replica = tablet.getReplicas().iterator().next();
Assertions.assertEquals(visibleVersion, replica.getVersion());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
index 6a38985b73f..7d918ef7db5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
@@ -141,7 +141,7 @@ public class TabletReplicaTooSlowTest {
List<Long> pathHashes = be.getDisks().values().stream()
.map(DiskInfo::getPathHash).collect(Collectors.toList());
Replica replica = cell.getValue();
- replica.setVersionCount(versionCount);
+ replica.setVisibleVersionCount(versionCount);
versionCount = versionCount + 200;
replica.setPathHash(pathHashes.get(0));
@@ -171,7 +171,7 @@ public class TabletReplicaTooSlowTest {
boolean found = false;
for (Table.Cell<Long, Long, Replica> cell :
replicaMetaTable.cellSet()) {
Replica replica = cell.getValue();
- if (replica.getVersionCount() == 401) {
+ if (replica.getVisibleVersionCount() == 401) {
if (replica.tooSlow()) {
LOG.info("set to TOO_SLOW.");
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index d4578e17d7f..852f072eca1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -84,19 +84,19 @@ public class TabletSchedCtxTest {
TabletSchedCtx.VersionCountComparator countComparator = new
TabletSchedCtx.VersionCountComparator();
List<Replica> replicaList = Lists.newArrayList();
Replica replica1 = new Replica();
- replica1.setVersionCount(100);
+ replica1.setVisibleVersionCount(100);
replica1.setState(Replica.ReplicaState.NORMAL);
Replica replica2 = new Replica();
- replica2.setVersionCount(50);
+ replica2.setVisibleVersionCount(50);
replica2.setState(Replica.ReplicaState.NORMAL);
Replica replica3 = new Replica();
- replica3.setVersionCount(-1);
+ replica3.setVisibleVersionCount(-1);
replica3.setState(Replica.ReplicaState.NORMAL);
Replica replica4 = new Replica();
- replica4.setVersionCount(200);
+ replica4.setVisibleVersionCount(200);
replica4.setState(Replica.ReplicaState.NORMAL);
replicaList.add(replica1);
@@ -105,10 +105,10 @@ public class TabletSchedCtxTest {
replicaList.add(replica4);
Collections.sort(replicaList, countComparator);
- Assert.assertEquals(50, replicaList.get(0).getVersionCount());
- Assert.assertEquals(100, replicaList.get(1).getVersionCount());
- Assert.assertEquals(200, replicaList.get(2).getVersionCount());
- Assert.assertEquals(-1, replicaList.get(3).getVersionCount());
+ Assert.assertEquals(50, replicaList.get(0).getVisibleVersionCount());
+ Assert.assertEquals(100, replicaList.get(1).getVisibleVersionCount());
+ Assert.assertEquals(200, replicaList.get(2).getVisibleVersionCount());
+ Assert.assertEquals(-1, replicaList.get(3).getVisibleVersionCount());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 76fe82e6599..9ff6e7ac3ca 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1101,7 +1101,7 @@ public class QueryPlanTest extends TestWithFeService {
mIndex.setRowCount(10000);
for (Tablet tablet : mIndex.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- replica.updateVersionInfo(2, 200000, 0, 10000);
+ replica.updateVersion(2);
}
}
}
@@ -1115,7 +1115,7 @@ public class QueryPlanTest extends TestWithFeService {
mIndex.setRowCount(10000);
for (Tablet tablet : mIndex.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- replica.updateVersionInfo(2, 200000, 0, 10000);
+ replica.updateVersion(2);
}
}
}
@@ -1199,7 +1199,7 @@ public class QueryPlanTest extends TestWithFeService {
mIndex.setRowCount(10000);
for (Tablet tablet : mIndex.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- replica.updateVersionInfo(2, 200000, 0, 10000);
+ replica.updateVersion(2);
}
}
}
@@ -1229,7 +1229,7 @@ public class QueryPlanTest extends TestWithFeService {
mIndex.setRowCount(0);
for (Tablet tablet : mIndex.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- replica.updateVersionInfo(2, 0, 0, 0);
+ replica.updateVersion(2);
}
}
}
@@ -1249,7 +1249,7 @@ public class QueryPlanTest extends TestWithFeService {
mIndex.setRowCount(10000);
for (Tablet tablet : mIndex.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- replica.updateVersionInfo(2, 200000, 0, 10000);
+ replica.updateVersion(2);
}
}
}
@@ -1278,7 +1278,7 @@ public class QueryPlanTest extends TestWithFeService {
mIndex.setRowCount(0);
for (Tablet tablet : mIndex.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
- replica.updateVersionInfo(2, 0, 0, 0);
+ replica.updateVersion(2);
}
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index ea63a5e18b1..437f1bcb209 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -42,6 +42,7 @@ import org.junit.rules.ExpectedException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class DatabaseTransactionMgrTest {
@@ -115,7 +116,10 @@ public class DatabaseTransactionMgrTest {
setTransactionFinishPublish(transactionState1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2,
CatalogTestUtil.testBackendId3));
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId1);
+ Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+ Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId1,
+ partitionVisibleVersions, backendPartitions);
labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
// txn 2, 3, 4
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 4f22d95c60b..5c8f72723ad 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -53,8 +53,10 @@ import
org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -65,6 +67,7 @@ import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
public class GlobalTransactionMgrTest {
@@ -474,7 +477,10 @@ public class GlobalTransactionMgrTest {
CatalogTestUtil.testBackendId2,
CatalogTestUtil.testBackendId3));
transactionState.getPublishVersionTasks()
.get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1);
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId);
+ Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+ Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId,
+ partitionVisibleVersions, backendPartitions);
transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.VISIBLE,
transactionState.getTransactionStatus());
// check replica version
@@ -493,6 +499,14 @@ public class GlobalTransactionMgrTest {
}
}
+
+ Assert.assertEquals(ImmutableMap.of(testPartition.getId(),
CatalogTestUtil.testStartVersion + 1),
+ partitionVisibleVersions);
+ Set<Long> partitionIds = Sets.newHashSet(testPartition.getId());
+ Assert.assertEquals(partitionIds,
backendPartitions.get(CatalogTestUtil.testBackendId1));
+ Assert.assertEquals(partitionIds,
backendPartitions.get(CatalogTestUtil.testBackendId2));
+ Assert.assertEquals(partitionIds,
backendPartitions.get(CatalogTestUtil.testBackendId3));
+
// slave replay new state and compare catalog
slaveTransMgr.replayUpsertTransactionState(transactionState);
Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv));
@@ -535,8 +549,13 @@ public class GlobalTransactionMgrTest {
// backend2 publish failed
transactionState.getPublishVersionTasks()
.get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1);
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId);
+ Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+ Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId,
+ partitionVisibleVersions, backendPartitions);
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
+ Assert.assertTrue(partitionVisibleVersions.isEmpty());
+ Assert.assertTrue(backendPartitions.isEmpty());
Replica replica1 =
tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
Replica replica2 =
tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
Replica replica3 =
tablet.getReplicaById(CatalogTestUtil.testReplicaId3);
@@ -554,7 +573,8 @@ public class GlobalTransactionMgrTest {
backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L);
transactionState.getPublishVersionTasks()
.get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets);
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId);
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId,
+ partitionVisibleVersions, backendPartitions);
Assert.assertEquals(TransactionStatus.VISIBLE,
transactionState.getTransactionStatus());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 1,
replica1.getVersion());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 1,
replica2.getVersion());
@@ -619,7 +639,8 @@ public class GlobalTransactionMgrTest {
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2,
CatalogTestUtil.testBackendId3));
- masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId2);
+ masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1,
transactionId2,
+ partitionVisibleVersions, backendPartitions);
Assert.assertEquals(TransactionStatus.VISIBLE,
transactionState.getTransactionStatus());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 2,
replica1.getVersion());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 2,
replica2.getVersion());
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 9eb4ece8d72..6da55c66c5a 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -395,6 +395,10 @@ struct TPublishVersionRequest {
4: optional set<Types.TTabletId> base_tablet_ids
}
+struct TVisibleVersionReq {
+ 1: required map<Types.TPartitionId, Types.TVersion> partition_version
+}
+
struct TClearAlterTaskRequest {
1: required Types.TTabletId tablet_id
2: required Types.TSchemaHash schema_hash
@@ -496,6 +500,7 @@ struct TAgentTaskRequest {
32: optional TAlterInvertedIndexReq alter_inverted_index_req
33: optional TGcBinlogReq gc_binlog_req
34: optional TCleanTrashReq clean_trash_req
+ 35: optional TVisibleVersionReq visible_version_req
}
struct TAgentResult {
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 0a2edb8ccbf..12036ef93ce 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -33,9 +33,10 @@ struct TTabletStat {
1: required i64 tablet_id
// local data size
2: optional i64 data_size
- 3: optional i64 row_num
- 4: optional i64 version_count
+ 3: optional i64 row_count
+ 4: optional i64 total_version_count
5: optional i64 remote_data_size
+ 6: optional i64 visible_version_count
}
struct TTabletStatResult {
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index 9acd3f85f7b..7442a86a990 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -33,7 +33,7 @@ struct TTabletInfo {
6: required Types.TSize data_size
7: optional Types.TStorageMedium storage_medium
8: optional list<Types.TTransactionId> transaction_ids
- 9: optional i64 version_count
+ 9: optional i64 total_version_count
10: optional i64 path_hash
11: optional bool version_miss
12: optional bool used
@@ -46,6 +46,7 @@ struct TTabletInfo {
// 18: optional bool is_cooldown
19: optional i64 cooldown_term
20: optional Types.TUniqueId cooldown_meta_id
+ 21: optional i64 visible_version_count
}
struct TFinishTaskRequest {
@@ -106,6 +107,7 @@ struct TReportRequest {
10: optional list<AgentService.TStorageResource> resource // only id and
version
11: i32 num_cores
12: i32 pipeline_executor_size
+ 13: optional map<Types.TPartitionId, Types.TVersion> partitions_version
}
struct TMasterResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 3c2fcfbd6fa..3cebb5a81ad 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -223,7 +223,8 @@ enum TTaskType {
PUSH_STORAGE_POLICY,
ALTER_INVERTED_INDEX,
GC_BINLOG,
- CLEAN_TRASH
+ CLEAN_TRASH,
+ UPDATE_VISIBLE_VERSION
}
enum TStmtType {
diff --git
a/regression-test/data/compaction/test_compaction_with_visible_version.out
b/regression-test/data/compaction/test_compaction_with_visible_version.out
new file mode 100644
index 00000000000..de90dd5fa2a
--- /dev/null
+++ b/regression-test/data/compaction/test_compaction_with_visible_version.out
@@ -0,0 +1,448 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_1 --
+0 0
+1 10
+2 20
+3 30
+4 40
+5 50
+6 60
+7 70
+8 80
+9 90
+10 100
+11 110
+12 120
+13 130
+14 140
+15 150
+16 160
+17 170
+18 180
+19 190
+20 200
+
+-- !select_2 --
+0 0
+1 10
+2 20
+3 30
+4 40
+5 50
+6 60
+7 70
+8 80
+9 90
+10 100
+11 110
+12 120
+13 130
+14 140
+15 150
+16 160
+17 170
+18 180
+19 190
+20 200
+
+-- !select_3 --
+0 0
+1 10
+2 20
+3 30
+4 40
+5 50
+6 60
+7 70
+8 80
+9 90
+10 100
+11 110
+12 120
+13 130
+14 140
+15 150
+16 160
+17 170
+18 180
+19 190
+20 200
+21 210
+22 220
+23 230
+24 240
+25 250
+26 260
+27 270
+28 280
+29 290
+30 300
+31 310
+32 320
+33 330
+34 340
+35 350
+36 360
+37 370
+38 380
+39 390
+40 400
+41 410
+
+-- !select_4 --
+0 0
+1 10
+2 20
+3 30
+4 40
+5 50
+6 60
+7 70
+8 80
+9 90
+10 100
+11 110
+12 120
+13 130
+14 140
+15 150
+16 160
+17 170
+18 180
+19 190
+20 200
+21 210
+22 220
+23 230
+24 240
+25 250
+26 260
+27 270
+28 280
+29 290
+30 300
+31 310
+32 320
+33 330
+34 340
+35 350
+36 360
+37 370
+38 380
+39 390
+40 400
+41 410
+
+-- !select_5 --
+0 0
+1 10
+2 20
+3 30
+4 40
+5 50
+6 60
+7 70
+8 80
+9 90
+10 100
+11 110
+12 120
+13 130
+14 140
+15 150
+16 160
+17 170
+18 180
+19 190
+20 200
+21 210
+22 220
+23 230
+24 240
+25 250
+26 260
+27 270
+28 280
+29 290
+30 300
+31 310
+32 320
+33 330
+34 340
+35 350
+36 360
+37 370
+38 380
+39 390
+40 400
+41 410
+
+-- !select_6 --
+0 0
+1 10
+2 20
+3 30
+4 40
+5 50
+6 60
+7 70
+8 80
+9 90
+10 100
+11 110
+12 120
+13 130
+14 140
+15 150
+16 160
+17 170
+18 180
+19 190
+20 200
+21 210
+22 220
+23 230
+24 240
+25 250
+26 260
+27 270
+28 280
+29 290
+30 300
+31 310
+32 320
+33 330
+34 340
+35 350
+36 360
+37 370
+38 380
+39 390
+40 400
+41 410
+42 420
+43 430
+44 440
+45 450
+46 460
+47 470
+48 480
+49 490
+50 500
+51 510
+52 520
+53 530
+54 540
+55 550
+56 560
+57 570
+58 580
+59 590
+60 600
+61 610
+62 620
+63 630
+64 640
+65 650
+66 660
+67 670
+68 680
+69 690
+70 700
+71 710
+72 720
+73 730
+74 740
+75 750
+76 760
+77 770
+78 780
+79 790
+80 800
+81 810
+82 820
+83 830
+84 840
+85 850
+86 860
+87 870
+88 880
+89 890
+90 900
+91 910
+92 920
+93 930
+94 940
+95 950
+96 960
+97 970
+98 980
+99 990
+100 1000
+101 1010
+102 1020
+103 1030
+104 1040
+105 1050
+106 1060
+107 1070
+108 1080
+109 1090
+110 1100
+111 1110
+112 1120
+113 1130
+114 1140
+115 1150
+116 1160
+117 1170
+118 1180
+119 1190
+120 1200
+121 1210
+
+-- !select_7 --
+0 0
+1 10
+2 20
+3 30
+4 40
+5 50
+6 60
+7 70
+8 80
+9 90
+10 100
+11 110
+12 120
+13 130
+14 140
+15 150
+16 160
+17 170
+18 180
+19 190
+20 200
+21 210
+22 220
+23 230
+24 240
+25 250
+26 260
+27 270
+28 280
+29 290
+30 300
+31 310
+32 320
+33 330
+34 340
+35 350
+36 360
+37 370
+38 380
+39 390
+40 400
+41 410
+42 420
+43 430
+44 440
+45 450
+46 460
+47 470
+48 480
+49 490
+50 500
+51 510
+52 520
+53 530
+54 540
+55 550
+56 560
+57 570
+58 580
+59 590
+60 600
+61 610
+62 620
+63 630
+64 640
+65 650
+66 660
+67 670
+68 680
+69 690
+70 700
+71 710
+72 720
+73 730
+74 740
+75 750
+76 760
+77 770
+78 780
+79 790
+80 800
+81 810
+82 820
+83 830
+84 840
+85 850
+86 860
+87 870
+88 880
+89 890
+90 900
+91 910
+92 920
+93 930
+94 940
+95 950
+96 960
+97 970
+98 980
+99 990
+100 1000
+101 1010
+102 1020
+103 1030
+104 1040
+105 1050
+106 1060
+107 1070
+108 1080
+109 1090
+110 1100
+111 1110
+112 1120
+113 1130
+114 1140
+115 1150
+116 1160
+117 1170
+118 1180
+119 1190
+120 1200
+121 1210
+122 1220
+123 1230
+124 1240
+125 1250
+126 1260
+127 1270
+128 1280
+129 1290
+130 1300
+131 1310
+132 1320
+133 1330
+134 1340
+135 1350
+136 1360
+137 1370
+138 1380
+139 1390
+140 1400
+141 1410
+142 1420
+
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index a915851b938..49bfbc18792 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -34,13 +34,16 @@ class ClusterOptions {
int feNum = 1
int beNum = 3
+
List<String> feConfigs = [
'heartbeat_interval_second=5',
]
List<String> beConfigs = [
+ 'report_disk_state_interval_seconds=2',
'report_random_wait=false',
]
+
boolean connectToFollower = false
// 1. cloudMode = true, only create cloud cluster.
diff --git
a/regression-test/suites/compaction/test_compaction_with_visible_version.groovy
b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy
new file mode 100644
index 00000000000..194a1b67566
--- /dev/null
+++
b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.NodeType
+
+suite('test_compaction_with_visible_version') {
+ def options = new ClusterOptions()
+ def compaction_keep_invisible_version_min_count = 50L
+ options.feConfigs += [
+ 'partition_info_update_interval_secs=5',
+ ]
+ options.beConfigs += [
+ 'disable_auto_compaction=true',
+ 'report_tablet_interval_seconds=1',
+ 'tablet_rowset_stale_sweep_by_size=true',
+ 'tablet_rowset_stale_sweep_threshold_size=0',
+ 'compaction_keep_invisible_version_timeout_sec=6000',
+
"compaction_keep_invisible_version_min_count=${compaction_keep_invisible_version_min_count}".toString(),
+ 'compaction_keep_invisible_version_max_count=500',
+ ]
+ options.enableDebugPoints()
+
+ docker(options) {
+ def E_CUMULATIVE_NO_SUITABLE_VERSION = 'E-2000'
+ def E_FULL_MISS_VERSION = 'E-2009'
+
+ sql 'SET GLOBAL insert_visible_timeout_ms = 3000'
+
+ def tableName = 'test_compaction_with_visible_version'
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort)
+
+ def triggerCompaction = { tablet, isCompactSucc, compaction_type ->
+ def tabletId = tablet.TabletId
+ def backendId = tablet.BackendId
+ def backendIp = backendId_to_backendIP.get(backendId)
+ def backendHttpPort = backendId_to_backendHttpPort.get(backendId)
+ def code
+ def out
+ def err
+ if (compaction_type == 'base') {
+ (code, out, err) = be_run_base_compaction(backendIp,
backendHttpPort, tabletId)
+ } else {
+ (code, out, err) = be_run_cumulative_compaction(backendIp,
backendHttpPort, tabletId)
+ }
+ logger.info("Run compaction: code=${code}, out=${out}, err=${err}")
+ assertEquals(0, code)
+ def compactJson = parseJson(out.trim())
+ if (isCompactSucc) {
+ assertEquals('success', compactJson.status.toLowerCase())
+ } else {
+ if (compaction_type == 'base') {
+ assertEquals(E_FULL_MISS_VERSION, compactJson.status)
+ } else {
+ assertEquals(E_CUMULATIVE_NO_SUITABLE_VERSION,
compactJson.status)
+ }
+ }
+ }
+
+ def waitCompaction = { tablet, startTs ->
+ def tabletId = tablet.TabletId
+ def backendId = tablet.BackendId
+ def backendIp = backendId_to_backendIP.get(backendId)
+ def backendHttpPort = backendId_to_backendHttpPort.get(backendId)
+ def running = true
+ while (running) {
+ assertTrue(System.currentTimeMillis() - startTs < 60 * 1000)
+ Thread.sleep(1000)
+ def (code, out, err) = be_get_compaction_status(backendIp,
backendHttpPort, tabletId)
+ logger.info("Get compaction: code=${code}, out=${out},
err=${err}")
+ assertEquals(0, code)
+
+ def compactionStatus = parseJson(out.trim())
+ assertEquals('success', compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ }
+ }
+
+ def checkCompact = { isCumuCompactSucc, runBaseCompact,
isInvisibleTimeout, version, visibleVersion ->
+ def partition = sql_return_maparray("SHOW PARTITIONS FROM
${tableName}")[0]
+ assertEquals(visibleVersion, partition.VisibleVersion as long)
+
+ // wait be report version count
+ Thread.sleep(3 * 1000)
+ def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}"
+ def lastInvisibleVersionCountMap = [:]
+ def lastVisibleVersionCountMap = [:]
+ tablets.each {
+ lastVisibleVersionCountMap[it.BackendId] =
it.VisibleVersionCount as long
+ lastInvisibleVersionCountMap[it.BackendId] =
+ (it.VersionCount as long) - (it.VisibleVersionCount as
long)
+ triggerCompaction it, isCumuCompactSucc, 'cumulative'
+ }
+
+ if (isCumuCompactSucc) {
+ // wait compaction done
+ def startTs = System.currentTimeMillis()
+ tablets.each {
+ waitCompaction it, startTs
+ }
+ }
+
+ if (runBaseCompact) {
+ tablets.each {
+ triggerCompaction it, true, 'base'
+ }
+
+ def startTs = System.currentTimeMillis()
+ tablets.each {
+ waitCompaction it, startTs
+ }
+ }
+
+ // wait report
+ Thread.sleep(3 * 1000)
+
+ tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}"
+ tablets.each {
+ def backendId = it.BackendId
+ def visibleVersionCount = it.VisibleVersionCount as long
+ def totalVersionCount = it.VersionCount as long
+ def invisibleVersionCount = totalVersionCount -
visibleVersionCount
+ assertEquals(version, it.Version as long)
+
+ if (isInvisibleTimeout) {
+ def values = [Math.min(version - visibleVersion,
compaction_keep_invisible_version_min_count),
+ Math.min(version - visibleVersion,
compaction_keep_invisible_version_min_count + 1)]
+
+ // part of invisible version was compact
+ assertTrue(invisibleVersionCount in values,
+ "not match, invisibleVersionCount:
${invisibleVersionCount}, candidate values: ${values}")
+ } else {
+ // invisible version couldn't compact
+ assertEquals(version - visibleVersion,
invisibleVersionCount)
+ }
+
+ def lastVisibleVersionCount =
lastVisibleVersionCountMap.get(backendId)
+ def lastInvisibleVersionCount =
lastInvisibleVersionCountMap.get(backendId)
+ if (isCumuCompactSucc) {
+ assertTrue(lastInvisibleVersionCount >
invisibleVersionCount || lastInvisibleVersionCount <= 1,
+ "not met with: lastInvisibleVersionCount
${lastInvisibleVersionCount} > "
+ + "invisibleVersionCount ${invisibleVersionCount}")
+ if (runBaseCompact) {
+ assertEquals(1L, visibleVersionCount)
+ }
+ } else {
+ assertEquals(lastVisibleVersionCount, visibleVersionCount)
+ }
+ }
+ }
+
+ sql " CREATE TABLE ${tableName} (k1 int, k2 int) DISTRIBUTED BY
HASH(k1) BUCKETS 1 "
+
+ // normal
+ def rowNum = 0L
+ def insertNRecords = { num ->
+ // if enable debug point
DatabaseTransactionMgr.stop_finish_transaction,
+ // insert will need to wait insert_visible_timeout_ms.
+ // so use multiple threads to reduce the wait time.
+ def futures = []
+ for (def i = 0; i < num; i++, rowNum++) {
+ def index = rowNum
+ futures.add(thread {
+ sql " INSERT INTO ${tableName} VALUES (${index}, ${index *
10}) "
+ })
+ }
+ futures.each { it.get() }
+ }
+ insertNRecords(21)
+ // after insert 21 rows, be can run compact ok.
+ checkCompact(true, false, false, rowNum + 1, rowNum + 1)
+ qt_select_1 "SELECT * FROM ${tableName} ORDER BY k1"
+
+ // publish but not visible
+ def lastRowNum = rowNum
+ cluster.injectDebugPoints(NodeType.FE,
['DatabaseTransactionMgr.stop_finish_transaction':null])
+ insertNRecords(21)
+ // after enable debugpoint, be will add rowsets, but visible version
will not increase.
+ // then no rowsets can pick to compact.
+ // so expect compact failed.
+ checkCompact(false, false, false, rowNum + 1, lastRowNum + 1)
+ qt_select_2 "SELECT * FROM ${tableName} ORDER BY k1"
+
+ cluster.clearFrontendDebugPoints()
+ Thread.sleep(5000)
+ // after clear debug point, visible version will increase.
+ // then some rowsets can pick to compact.
+ // so expect compact succ.
+ checkCompact(true, true, false, rowNum + 1, rowNum + 1)
+ qt_select_3 "SELECT * FROM ${tableName} ORDER BY k1"
+
+ lastRowNum = rowNum
+ cluster.injectDebugPoints(NodeType.FE,
['DatabaseTransactionMgr.stop_finish_transaction':null])
+ insertNRecords(80)
+ // 80 versions are not invisible yet, BE will not compact them.
+ // if we send http to compact them, BE will reply no rowsets can
compact now
+ checkCompact(false, false, false, rowNum + 1, lastRowNum + 1)
+ // Because BE not compact, so query should be ok.
+ qt_select_4 "SELECT * FROM ${tableName} ORDER BY k1"
+
+ update_all_be_config('compaction_keep_invisible_version_timeout_sec',
1)
+ checkCompact(true, false, true, rowNum + 1, lastRowNum + 1)
+ qt_select_5 "SELECT * FROM ${tableName} ORDER BY k1"
+
+ def getVersionCountMap = { ->
+ def versionCountMap = [:]
+ def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}"
+ tablets.each {
+ versionCountMap.put(it.BackendId as long,
[it.VisibleVersionCount as long, it.VersionCount as long])
+ }
+ return versionCountMap
+ }
+
+ // after backend restart, it should update its visible version from FE
+ // and then it report its visible version count and total version count
+ def oldVersionCountMap = getVersionCountMap()
+ cluster.restartBackends()
+ Thread.sleep(20000)
+ def newVersionCountMap = getVersionCountMap()
+ assertEquals(oldVersionCountMap, newVersionCountMap)
+
+ cluster.clearFrontendDebugPoints()
+ Thread.sleep(5000)
+ // after clear fe's debug point, the 80 version are visible now.
+ // so compact is ok
+ checkCompact(true, false, false, rowNum + 1, rowNum + 1)
+ qt_select_6 "SELECT * FROM ${tableName} ORDER BY k1"
+
+ cluster.injectDebugPoints(NodeType.FE,
['DatabaseTransactionMgr.stop_finish_transaction':null])
+ def compaction_keep_invisible_version_timeout_sec = 1
+ compaction_keep_invisible_version_min_count = 0L
+ update_all_be_config('compaction_keep_invisible_version_timeout_sec',
compaction_keep_invisible_version_timeout_sec)
+ update_all_be_config('compaction_keep_invisible_version_min_count',
compaction_keep_invisible_version_min_count)
+
+ lastRowNum = rowNum
+ insertNRecords(21)
+
+ Thread.sleep((compaction_keep_invisible_version_timeout_sec + 1) *
1000)
+
+ // after compaction_keep_invisible_version_timeout_sec,
+ // all version had been compact
(compaction_keep_invisible_version_min_count=0),
+ checkCompact(true, false, true, rowNum + 1, lastRowNum + 1)
+
+ // visible version had been compact
+ test {
+ sql "SELECT * FROM ${tableName} ORDER BY k1"
+
+ // E-230:
+ //(1105, 'errCode = 2, detailMessage =
(128.2.51.2)[CANCELLED]missed_versions is empty, spec_version 43,
+ // max_version 123, tablet_id 10062')
+ exception 'missed_versions is empty'
+ }
+
+ cluster.clearFrontendDebugPoints()
+ Thread.sleep(5000)
+ qt_select_7 "SELECT * FROM ${tableName} ORDER BY k1"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]