This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 908c84ce4e8 branch-3.1: [Fix](Clone) Fix compaction and mow failure
when missing rowset #52812 (#53496)
908c84ce4e8 is described below
commit 908c84ce4e804ce20338b076db4bb7751914c0a6
Author: abmdocrt <[email protected]>
AuthorDate: Fri Jul 18 10:57:19 2025 +0800
branch-3.1: [Fix](Clone) Fix compaction and mow failure when missing rowset
#52812 (#53496)
Pick #52812
---
be/src/agent/agent_server.cpp | 7 +-
be/src/agent/task_worker_pool.cpp | 48 ++++++++
be/src/agent/task_worker_pool.h | 2 +
be/src/common/config.cpp | 4 +
be/src/common/config.h | 4 +
be/src/olap/cumulative_compaction.cpp | 23 ++++
be/src/olap/storage_engine.cpp | 110 +++++++++++++++++
be/src/olap/storage_engine.h | 9 ++
be/src/olap/task/engine_clone_task.cpp | 2 +-
be/src/olap/task/engine_publish_version_task.cpp | 20 +++
be/src/util/blocking_priority_queue.hpp | 1 +
.../apache/doris/service/FrontendServiceImpl.java | 2 +
gensrc/thrift/Types.thrift | 2 +
...ion_clone_missing_rowset_fault_injection.groovy | 112 +++++++++++++++++
.../test_mow_publish_clone_missing_rowset.groovy | 134 +++++++++++++++++++++
15 files changed, 477 insertions(+), 3 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 810a242ca61..a05278480ed 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -43,6 +43,7 @@
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
+#include "util/work_thread_pool.hpp"
using std::string;
using std::vector;
@@ -172,8 +173,8 @@ void AgentServer::start_workers(StorageEngine& engine,
ExecEnv* exec_env) {
_workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
"ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&&
task) { return alter_tablet_callback(engine, task); });
- _workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
- "CLONE", config::clone_worker_count, [&engine, &cluster_info =
_cluster_info](auto&& task) { return clone_callback(engine, cluster_info,
task); });
+ _workers[TTaskType::CLONE] = std::make_unique<PriorTaskWorkerPool>(
+ "CLONE", config::clone_worker_count,config::clone_worker_count,
[&engine, &cluster_info = _cluster_info](auto&& task) { return
clone_callback(engine, cluster_info, task); });
_workers[TTaskType::STORAGE_MEDIUM_MIGRATE] =
std::make_unique<TaskWorkerPool>(
"STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count,
[&engine](auto&& task) { return storage_medium_migrate_callback(engine, task);
});
@@ -202,6 +203,8 @@ void AgentServer::start_workers(StorageEngine& engine,
ExecEnv* exec_env) {
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_INDEX_POLICY", _cluster_info,
config::report_index_policy_interval_seconds,[&cluster_info = _cluster_info] {
report_index_policy_callback(cluster_info); }));
// clang-format on
+
+ exec_env->storage_engine().to_local().workers = &_workers;
}
void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv*
exec_env) {
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index c7d8c961811..ae5067fc17a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -17,6 +17,7 @@
#include "agent/task_worker_pool.h"
+#include <brpc/controller.h>
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/DataSinks_types.h>
@@ -85,6 +86,7 @@
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
+#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/jni-util.h"
@@ -609,6 +611,52 @@ Status PriorTaskWorkerPool::submit_task(const
TAgentTaskRequest& task) {
});
}
+Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const
TAgentTaskRequest& task) {
+ const TTaskType::type task_type = task.task_type;
+ int64_t signature = task.signature;
+ std::string type_str;
+ EnumToString(TTaskType, task_type, type_str);
+ auto req = std::make_unique<TAgentTaskRequest>(task);
+
+ DCHECK(req->__isset.priority && req->priority == TPriority::HIGH);
+ do {
+ std::lock_guard lock(s_task_signatures_mtx);
+ auto& set = s_task_signatures[task_type];
+ if (!set.contains(signature)) {
+ // If it doesn't exist, put it directly into the priority queue
+ add_task_count(*req, 1);
+ set.insert(signature);
+ std::lock_guard lock(_mtx);
+ _high_prior_queue.push_back(std::move(req));
+ _high_prior_condv.notify_one();
+ _normal_condv.notify_one();
+ break;
+ } else {
+ std::lock_guard lock(_mtx);
+ for (auto it = _normal_queue.begin(); it != _normal_queue.end();) {
+ // If it exists in the normal queue, cancel the task in the
normal queue
+ if ((*it)->signature == signature) {
+ _normal_queue.erase(it); // cancel the
original task
+ _high_prior_queue.push_back(std::move(req)); // add the
new task to the queue
+ _high_prior_condv.notify_one();
+ _normal_condv.notify_one();
+ break;
+ } else {
+ ++it; // doesn't meet the condition, continue to the next
one
+ }
+ }
+ // If it exists in the high priority queue, no operation is needed
+ LOG_INFO("task has already existed in high prior
queue.").tag("signature", signature);
+ }
+ } while (false);
+
+ // Set the receiving time of task so that we can determine whether it is
timed out later
+ (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
+
+ LOG_INFO("successfully submit task").tag("type",
type_str).tag("signature", signature);
+ return Status::OK();
+}
+
void PriorTaskWorkerPool::normal_loop() {
while (true) {
std::unique_ptr<TAgentTaskRequest> req;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 8d9be32b3dc..b64d84fb5c4 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -89,6 +89,8 @@ public:
Status submit_task(const TAgentTaskRequest& task) override;
+ Status submit_high_prior_and_cancel_low(const TAgentTaskRequest& task);
+
private:
void normal_loop();
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index dee3b83b6b7..57aa4949809 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1544,6 +1544,10 @@ DEFINE_mInt32(segments_key_bounds_truncation_threshold,
"-1");
// ATTENTION: for test only, use random segments key bounds truncation
threshold every time
DEFINE_mBool(random_segments_key_bounds_truncation, "false");
+DEFINE_mBool(enable_auto_clone_on_compaction_missing_version, "false");
+
+DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4b50066fac3..16b2dece5c5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1620,6 +1620,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold);
// ATTENTION: for test only, use random segments key bounds truncation
threshold every time
DECLARE_mBool(random_segments_key_bounds_truncation);
+DECLARE_mBool(enable_auto_clone_on_compaction_missing_version);
+
+DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 73fe179c2ce..2a2fdb51eb2 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -18,10 +18,13 @@
#include "olap/cumulative_compaction.h"
#include <cpp/sync_point.h>
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/Types_types.h>
#include <memory>
#include <mutex>
#include <ostream>
+#include <vector>
#include "common/config.h"
#include "common/logging.h"
@@ -29,7 +32,9 @@
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset_meta.h"
+#include "olap/storage_engine.h"
#include "olap/tablet.h"
+#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/doris_metrics.h"
#include "util/time.h"
@@ -191,6 +196,24 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
<< " first missed version prev rowset verison=" <<
missing_versions[0]
<< ", first missed version next rowset version=" <<
missing_versions[1]
<< ", tablet=" << _tablet->tablet_id();
+ if (config::enable_auto_clone_on_compaction_missing_version) {
+ LOG_INFO("cumulative compaction submit missing rowset clone task.")
+ .tag("tablet_id", _tablet->tablet_id())
+ .tag("version", missing_versions.back().first)
+ .tag("replica_id", tablet()->replica_id())
+ .tag("partition_id", _tablet->partition_id())
+ .tag("table_id", _tablet->table_id());
+ Status st = _engine.submit_clone_task(tablet(),
missing_versions.back().first);
+ if (!st) {
+ LOG_WARNING("cumulative compaction failed to submit missing
rowset clone task.")
+ .tag("st", st.to_string())
+ .tag("tablet_id", _tablet->tablet_id())
+ .tag("version", missing_versions.back().first)
+ .tag("replica_id", tablet()->replica_id())
+ .tag("partition_id", _tablet->partition_id())
+ .tag("table_id", _tablet->table_id());
+ }
+ }
}
int64_t max_score = config::cumulative_compaction_max_deltas;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 6118b2a0e0b..d119e197d93 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -20,6 +20,8 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <fmt/format.h>
#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
@@ -37,6 +39,7 @@
#include <cstring>
#include <filesystem>
#include <iterator>
+#include <memory>
#include <mutex>
#include <ostream>
#include <set>
@@ -49,6 +52,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
+#include "gen_cpp/FrontendService.h"
#include "gutil/strings/substitute.h"
#include "io/fs/local_file_system.h"
#include "olap/binlog.h"
@@ -67,6 +71,7 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/txn_manager.h"
+#include "runtime/client_cache.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
@@ -74,8 +79,10 @@
#include "util/stopwatch.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
+#include "util/thrift_rpc_helper.h"
#include "util/uid_util.h"
#include "util/work_thread_pool.hpp"
+#include "vec/common/assert_cast.h"
using std::filesystem::directory_iterator;
using std::filesystem::path;
@@ -1505,6 +1512,79 @@ bool StorageEngine::get_peer_replica_info(int64_t
tablet_id, TReplicaInfo* repli
return false;
}
+bool StorageEngine::get_peers_replica_backends(int64_t tablet_id,
std::vector<TBackend>* backends) {
+ TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
+ if (tablet == nullptr) {
+ LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id;
+ return false;
+ }
+ int64_t cur_time = UnixMillis();
+ if (cur_time - _last_get_peers_replica_backends_time_ms < 10000) {
+ LOG_WARNING("failed to get peers replica backens.")
+ .tag("last time", _last_get_peers_replica_backends_time_ms)
+ .tag("cur time", cur_time);
+ return false;
+ }
+ LOG_INFO("start get peers replica backends info.").tag("tablet id",
tablet_id);
+ ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
+ if (cluster_info == nullptr) {
+ LOG(WARNING) << "Have not get FE Master heartbeat yet";
+ return false;
+ }
+ TNetworkAddress master_addr = cluster_info->master_fe_addr;
+ if (master_addr.hostname.empty() || master_addr.port == 0) {
+ LOG(WARNING) << "Have not get FE Master heartbeat yet";
+ return false;
+ }
+ TGetTabletReplicaInfosRequest request;
+ TGetTabletReplicaInfosResult result;
+ request.tablet_ids.emplace_back(tablet_id);
+ Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+ master_addr.hostname, master_addr.port,
+ [&request, &result](FrontendServiceConnection& client) {
+ client->getTabletReplicaInfos(result, request);
+ });
+
+ if (!rpc_st.ok()) {
+ LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc
failure, "
+ "tablet id: "
+ << tablet_id;
+ return false;
+ }
+ std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
+ if (result.tablet_replica_infos.contains(tablet_id)) {
+ std::vector<TReplicaInfo> reps =
result.tablet_replica_infos[tablet_id];
+ DCHECK_NE(reps.size(), 0);
+ for (const auto& rep : reps) {
+ if (rep.replica_id != tablet->replica_id()) {
+ TBackend backend;
+ backend.__set_host(rep.host);
+ backend.__set_be_port(rep.be_port);
+ backend.__set_http_port(rep.http_port);
+ backend.__set_brpc_port(rep.brpc_port);
+ if (rep.__isset.is_alive) {
+ backend.__set_is_alive(rep.is_alive);
+ }
+ if (rep.__isset.backend_id) {
+ backend.__set_id(rep.backend_id);
+ }
+ backends->emplace_back(backend);
+ std::stringstream backend_string;
+ backend.printTo(backend_string);
+ LOG_INFO("get 1 peer replica backend info.")
+ .tag("tablet id", tablet_id)
+ .tag("backend info", backend_string.str());
+ }
+ }
+ _last_get_peers_replica_backends_time_ms = UnixMillis();
+ LOG_INFO("succeed get peers replica backends info.")
+ .tag("tablet id", tablet_id)
+ .tag("replica num", backends->size());
+ return true;
+ }
+ return false;
+}
+
bool StorageEngine::should_fetch_from_peer(int64_t tablet_id) {
#ifdef BE_TEST
if (tablet_id % 2 == 0) {
@@ -1619,6 +1699,36 @@ Status StorageEngine::_persist_broken_paths() {
return Status::OK();
}
+Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) {
+ std::vector<TBackend> backends;
+ if (!get_peers_replica_backends(tablet->tablet_id(), &backends)) {
+ LOG(WARNING) << tablet->tablet_id() << " tablet doesn't have peer
replica backends";
+ return Status::InternalError("");
+ }
+ TAgentTaskRequest task;
+ TCloneReq req;
+ req.__set_tablet_id(tablet->tablet_id());
+ req.__set_schema_hash(tablet->schema_hash());
+ req.__set_src_backends(backends);
+ req.__set_version(version);
+ req.__set_replica_id(tablet->replica_id());
+ req.__set_partition_id(tablet->partition_id());
+ req.__set_table_id(tablet->table_id());
+ task.__set_task_type(TTaskType::CLONE);
+ task.__set_clone_req(req);
+ task.__set_priority(TPriority::HIGH);
+ task.__set_signature(tablet->tablet_id());
+ LOG_INFO("BE start to submit missing rowset clone task.")
+ .tag("tablet_id", tablet->tablet_id())
+ .tag("version", version)
+ .tag("replica_id", tablet->replica_id())
+ .tag("partition_id", tablet->partition_id())
+ .tag("table_id", tablet->table_id());
+
RETURN_IF_ERROR(assert_cast<PriorTaskWorkerPool*>(workers->at(TTaskType::CLONE).get())
+ ->submit_high_prior_and_cancel_low(task));
+ return Status::OK();
+}
+
int CreateTabletRRIdxCache::get_index(const std::string& key) {
auto* lru_handle = lookup(key);
if (lru_handle) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 72fd17d02e8..c5b9b193162 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -37,6 +37,7 @@
#include <unordered_set>
#include <vector>
+#include "agent/task_worker_pool.h"
#include "common/config.h"
#include "common/status.h"
#include "gutil/ref_counted.h"
@@ -313,6 +314,8 @@ public:
bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica,
std::string* token);
+ bool get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>*
backends);
+
bool should_fetch_from_peer(int64_t tablet_id);
const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
@@ -343,6 +346,10 @@ public:
std::set<string> get_broken_paths() { return _broken_paths; }
+ Status submit_clone_task(Tablet* tablet, int64_t version);
+
+ std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;
+
private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
@@ -564,6 +571,8 @@ private:
// thread to check tablet delete bitmap count tasks
scoped_refptr<Thread> _check_delete_bitmap_score_thread;
+
+ int64_t _last_get_peers_replica_backends_time_ms {0};
};
// lru cache for create tabelt round robin in disks
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 6a9e66f1d38..e2f915f6cda 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -38,6 +38,7 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
+#include <vector>
#include "common/config.h"
#include "common/logging.h"
@@ -989,5 +990,4 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
return tablet->revise_tablet_meta(to_add, to_delete, false);
// TODO(plat1ko): write cooldown meta to remote if this replica is
cooldown replica
}
-
} // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 2dcc1723b71..6c37e55da75 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -32,6 +32,7 @@
#include <unordered_map>
#include <utility>
+#include "cloud/config.h"
#include "common/logging.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
@@ -216,6 +217,25 @@ Status EnginePublishVersionTask::execute() {
continue;
}
auto handle_version_not_continuous = [&]() {
+ if
(config::enable_auto_clone_on_mow_publish_missing_version) {
+ LOG_INFO("mow publish submit missing rowset clone
task.")
+ .tag("tablet_id", tablet->tablet_id())
+ .tag("version", version.first - 1)
+ .tag("replica_id", tablet->replica_id())
+ .tag("partition_id",
tablet->partition_id())
+ .tag("table_id", tablet->table_id());
+ Status st =
_engine.submit_clone_task(tablet.get(), version.first - 1);
+ if (!st) {
+ LOG_WARNING(
+ "mow publish failed to submit missing
rowset clone task.")
+ .tag("st", st.to_string())
+ .tag("tablet_id", tablet->tablet_id())
+ .tag("version", version.first - 1)
+ .tag("replica_id",
tablet->replica_id())
+ .tag("partition_id",
tablet->partition_id())
+ .tag("table_id", tablet->table_id());
+ }
+ }
add_error_tablet_id(tablet_info.tablet_id);
// When there are too many missing versions, do not
directly retry the
// publish and handle it through async publish.
diff --git a/be/src/util/blocking_priority_queue.hpp
b/be/src/util/blocking_priority_queue.hpp
index bfc1c34e8f1..43fe1e4df47 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -22,6 +22,7 @@
#include <unistd.h>
+#include <cassert>
#include <condition_variable>
#include <mutex>
#include <queue>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 2dd0a6a746e..9a034952ca1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2818,6 +2818,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
replicaInfo.setBePort(backend.getBePort());
replicaInfo.setHttpPort(backend.getHttpPort());
replicaInfo.setBrpcPort(backend.getBrpcPort());
+ replicaInfo.setIsAlive(backend.isAlive());
+ replicaInfo.setBackendId(backend.getId());
replicaInfo.setReplicaId(replica.getId());
replicaInfos.add(replicaInfo);
}
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 80683388fa2..397f1e051ea 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -671,6 +671,8 @@ struct TReplicaInfo {
3: required TPort http_port
4: required TPort brpc_port
5: required TReplicaId replica_id
+ 6: optional bool is_alive
+ 7: optional i64 backend_id
}
struct TResourceInfo {
diff --git
a/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy
new file mode 100644
index 00000000000..a7f060a1108
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.NoHttpResponseException
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite('test_compaction_clone_missing_rowset_fault_injection', 'docker') {
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.enableDebugPoints()
+ options.feConfigs += [ "disable_tablet_scheduler=true" ]
+ options.beConfigs += [
"enable_auto_clone_on_compaction_missing_version=true" ]
+ options.beNum = 3
+ docker(options) {
+
+ def injectBe = null
+ def normalBe = null
+ def backends = sql_return_maparray('show backends')
+
+ injectBe = backends[0]
+ assertNotNull(injectBe)
+ normalBe = backends[1]
+ assertNotNull(normalBe)
+
+ try {
+ def tableName = "test_compaction_clone_missing_rowset"
+ sql """ DROP TABLE IF EXISTS ${tableName} force"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ DUPLICATE KEY(k)
+ DISTRIBUTED BY HASH(k)
+ BUCKETS 1
+ properties(
+ "replication_num" = "3",
+ "disable_auto_compaction" = "true")
+ """
+ sql """ INSERT INTO ${tableName} VALUES (1,0)"""
+ DebugPoint.enableDebugPoint(injectBe.Host,
injectBe.HttpPort.toInteger(), NodeType.BE,
"EnginePublishVersionTask.finish.random", [percent:"1.0"])
+ sql """ INSERT INTO ${tableName} VALUES (2,0)"""
+ sql """ INSERT INTO ${tableName} VALUES (3,0)"""
+ sql """ INSERT INTO ${tableName} VALUES (4,0)"""
+ DebugPoint.disableDebugPoint(injectBe.Host,
injectBe.HttpPort.toInteger(), NodeType.BE,
"EnginePublishVersionTask.finish.random")
+ sql """ INSERT INTO ${tableName} VALUES (5,0)"""
+
+ def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+ def tabletId = array[0].TabletId
+
+ // 1st check rowsets
+ logger.info("1st show:" + tabletId)
+ def (code, out, err) = be_show_tablet_status(injectBe.Host,
injectBe.HttpPort, tabletId)
+ logger.info("1st show: code=" + code + ", out=" + out + ", err=" +
err)
+ assertTrue(out.contains("[0-1]"))
+ assertTrue(out.contains("[2-2]"))
+ // missing rowset [3-5]
+ assertTrue(out.contains("[3-5]"))
+ assertTrue(out.contains("[6-6]"))
+
+ logger.info("1st run cumu compaction:" + tabletId)
+ (code, out, err) = be_run_cumulative_compaction(injectBe.Host,
injectBe.HttpPort, tabletId)
+ logger.info("1st Run cumu compaction: code=" + code + ", out=" +
out + ", err=" + err)
+
+ sleep(10000)
+
+ // 2nd check rowsets
+ logger.info("2nd show:" + tabletId)
+ (code, out, err) = be_show_tablet_status(injectBe.Host,
injectBe.HttpPort, tabletId)
+ logger.info("2nd show: code=" + code + ", out=" + out + ", err=" +
err)
+ assertTrue(out.contains("[0-1]"))
+ assertTrue(out.contains("[2-2]"))
+ assertTrue(out.contains("[3-3]"))
+ assertTrue(out.contains("[4-4]"))
+ assertTrue(out.contains("[5-5]"))
+ assertTrue(out.contains("[6-6]"))
+
+ logger.info("2nd cumu compaction:" + tabletId)
+ (code, out, err) = be_run_cumulative_compaction(injectBe.Host,
injectBe.HttpPort, tabletId)
+ logger.info("2nd cumu compaction: code=" + code + ", out=" + out +
", err=" + err)
+
+ // check rowsets
+ logger.info("3rd show:" + tabletId)
+ (code, out, err) = be_show_tablet_status(injectBe.Host,
injectBe.HttpPort, tabletId)
+ logger.info("3rd show: code=" + code + ", out=" + out + ", err=" +
err)
+ assertTrue(out.contains("[0-1]"))
+ assertTrue(out.contains("[2-2]"))
+ assertTrue(out.contains("[3-6]"))
+
+ } finally {
+ if (injectBe != null) {
+ DebugPoint.disableDebugPoint(injectBe.Host,
injectBe.HttpPort.toInteger(), NodeType.BE,
"EnginePublishVersionTask.finish.random")
+ }
+ }
+ }
+}
diff --git
a/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy
b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy
new file mode 100644
index 00000000000..14f0073f5c8
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.NoHttpResponseException
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite('test_mow_publish_clone_missing_rowset_fault_injection', 'docker') {
+
+ def set_be_param = { paramName, paramValue, beIp, bePort ->
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.enableDebugPoints()
+ options.feConfigs += [ "disable_tablet_scheduler=true" ]
+ options.beConfigs += [
"enable_auto_clone_on_mow_publish_missing_version=false" ]
+ options.beNum = 3
+ docker(options) {
+
+ def injectBe = null
+ def normalBe = null
+ def backends = sql_return_maparray('show backends')
+
+ injectBe = backends[0]
+ assertNotNull(injectBe)
+ normalBe = backends[1]
+ assertNotNull(normalBe)
+
+ try {
+ def tableName = "test_mow_publish_clone_missing_rowset"
+ sql """ DROP TABLE IF EXISTS ${tableName} force"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ UNIQUE KEY(k)
+ DISTRIBUTED BY HASH(k)
+ BUCKETS 1
+ properties(
+ "replication_num" = "3",
+ "disable_auto_compaction" = "true")
+ """
+ sql """ INSERT INTO ${tableName} VALUES (1,0)"""
+ DebugPoint.enableDebugPoint(injectBe.Host,
injectBe.HttpPort.toInteger(), NodeType.BE,
"EnginePublishVersionTask.finish.random", [percent:"1.0"])
+ sql """ INSERT INTO ${tableName} VALUES (2,0)"""
+ DebugPoint.disableDebugPoint(injectBe.Host,
injectBe.HttpPort.toInteger(), NodeType.BE,
"EnginePublishVersionTask.finish.random")
+ sql """ INSERT INTO ${tableName} VALUES (3,0)"""
+ sql """ INSERT INTO ${tableName} VALUES (4,0)"""
+ sql """ INSERT INTO ${tableName} VALUES (5,0)"""
+
+ def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+ def tabletId = array[0].TabletId
+
+ // normal be check rowsets
+ logger.info("normal be show:" + tabletId)
+ def (code, out, err) = be_show_tablet_status(normalBe.Host,
normalBe.HttpPort, tabletId)
+ logger.info("normal be show: code=" + code + ", out=" + out + ",
err=" + err)
+ assertTrue(out.contains("[0-1]"))
+ assertTrue(out.contains("[2-2]"))
+ assertTrue(out.contains("[3-3]"))
+ assertTrue(out.contains("[4-4]"))
+ assertTrue(out.contains("[5-5]"))
+ assertTrue(out.contains("[6-6]"))
+
+ // 1st inject be check rowsets
+ logger.info("1st inject be show:" + tabletId)
+ (code, out, err) = be_show_tablet_status(injectBe.Host,
injectBe.HttpPort, tabletId)
+ logger.info("1st inject be show: code=" + code + ", out=" + out +
", err=" + err)
+ assertTrue(out.contains("[0-1]"))
+ assertTrue(out.contains("[2-2]"))
+ assertFalse(out.contains("[3-3]"))
+ assertFalse(out.contains("[4-4]"))
+ assertFalse(out.contains("[5-5]"))
+ assertFalse(out.contains("[6-6]"))
+
+ set_be_param("enable_auto_clone_on_mow_publish_missing_version",
"true", injectBe.Host, injectBe.HttpPort);
+ Thread.sleep(10000)
+ // submit clone task
+ sql """ INSERT INTO ${tableName} VALUES (6,0)"""
+
+ sleep(10000)
+
+ // 2nd inject be check rowsets
+ logger.info("2nd inject be show:" + tabletId)
+ (code, out, err) = be_show_tablet_status(injectBe.Host,
injectBe.HttpPort, tabletId)
+ logger.info("2nd inject be show: code=" + code + ", out=" + out +
", err=" + err)
+ assertTrue(out.contains("[0-1]"))
+ assertTrue(out.contains("[2-2]"))
+ assertTrue(out.contains("[3-3]"))
+ assertTrue(out.contains("[4-4]"))
+ assertTrue(out.contains("[5-5]"))
+ assertTrue(out.contains("[6-6]"))
+ assertTrue(out.contains("[7-7]"))
+
+ sql """ INSERT INTO ${tableName} VALUES (7,0)"""
+
+ // 3rd inject be check rowsets
+ logger.info("3rd inject be show:" + tabletId)
+ (code, out, err) = be_show_tablet_status(injectBe.Host,
injectBe.HttpPort, tabletId)
+ logger.info("3rd inject be show: code=" + code + ", out=" + out +
", err=" + err)
+ assertTrue(out.contains("[0-1]"))
+ assertTrue(out.contains("[2-2]"))
+ assertTrue(out.contains("[3-3]"))
+ assertTrue(out.contains("[4-4]"))
+ assertTrue(out.contains("[5-5]"))
+ assertTrue(out.contains("[6-6]"))
+ assertTrue(out.contains("[7-7]"))
+ assertTrue(out.contains("[8-8]"))
+ } finally {
+ if (injectBe != null) {
+ DebugPoint.disableDebugPoint(injectBe.Host,
injectBe.HttpPort.toInteger(), NodeType.BE,
"EnginePublishVersionTask.finish.random")
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]