This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 908c84ce4e8 branch-3.1: [Fix](Clone) Fix compaction and mow failure 
when missing rowset #52812 (#53496)
908c84ce4e8 is described below

commit 908c84ce4e804ce20338b076db4bb7751914c0a6
Author: abmdocrt <[email protected]>
AuthorDate: Fri Jul 18 10:57:19 2025 +0800

    branch-3.1: [Fix](Clone) Fix compaction and mow failure when missing rowset 
#52812 (#53496)
    
    Pick #52812
---
 be/src/agent/agent_server.cpp                      |   7 +-
 be/src/agent/task_worker_pool.cpp                  |  48 ++++++++
 be/src/agent/task_worker_pool.h                    |   2 +
 be/src/common/config.cpp                           |   4 +
 be/src/common/config.h                             |   4 +
 be/src/olap/cumulative_compaction.cpp              |  23 ++++
 be/src/olap/storage_engine.cpp                     | 110 +++++++++++++++++
 be/src/olap/storage_engine.h                       |   9 ++
 be/src/olap/task/engine_clone_task.cpp             |   2 +-
 be/src/olap/task/engine_publish_version_task.cpp   |  20 +++
 be/src/util/blocking_priority_queue.hpp            |   1 +
 .../apache/doris/service/FrontendServiceImpl.java  |   2 +
 gensrc/thrift/Types.thrift                         |   2 +
 ...ion_clone_missing_rowset_fault_injection.groovy | 112 +++++++++++++++++
 .../test_mow_publish_clone_missing_rowset.groovy   | 134 +++++++++++++++++++++
 15 files changed, 477 insertions(+), 3 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 810a242ca61..a05278480ed 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -43,6 +43,7 @@
 #include "olap/snapshot_manager.h"
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
+#include "util/work_thread_pool.hpp"
 
 using std::string;
 using std::vector;
@@ -172,8 +173,8 @@ void AgentServer::start_workers(StorageEngine& engine, 
ExecEnv* exec_env) {
     _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
             "ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& 
task) { return alter_tablet_callback(engine, task); });
 
-    _workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
-            "CLONE", config::clone_worker_count, [&engine, &cluster_info = 
_cluster_info](auto&& task) { return clone_callback(engine, cluster_info, 
task); });
+    _workers[TTaskType::CLONE] = std::make_unique<PriorTaskWorkerPool>(
+            "CLONE", config::clone_worker_count,config::clone_worker_count, 
[&engine, &cluster_info = _cluster_info](auto&& task) { return 
clone_callback(engine, cluster_info, task); });
 
     _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = 
std::make_unique<TaskWorkerPool>(
             "STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, 
[&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); 
});
@@ -202,6 +203,8 @@ void AgentServer::start_workers(StorageEngine& engine, 
ExecEnv* exec_env) {
     _report_workers.push_back(std::make_unique<ReportWorker>(
             "REPORT_INDEX_POLICY", _cluster_info, 
config::report_index_policy_interval_seconds,[&cluster_info = _cluster_info] { 
report_index_policy_callback(cluster_info); }));
     // clang-format on
+
+    exec_env->storage_engine().to_local().workers = &_workers;
 }
 
 void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* 
exec_env) {
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index c7d8c961811..ae5067fc17a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -17,6 +17,7 @@
 
 #include "agent/task_worker_pool.h"
 
+#include <brpc/controller.h>
 #include <fmt/format.h>
 #include <gen_cpp/AgentService_types.h>
 #include <gen_cpp/DataSinks_types.h>
@@ -85,6 +86,7 @@
 #include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/snapshot_loader.h"
 #include "service/backend_options.h"
+#include "util/brpc_client_cache.h"
 #include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "util/jni-util.h"
@@ -609,6 +611,52 @@ Status PriorTaskWorkerPool::submit_task(const 
TAgentTaskRequest& task) {
     });
 }
 
+Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const 
TAgentTaskRequest& task) {
+    const TTaskType::type task_type = task.task_type;
+    int64_t signature = task.signature;
+    std::string type_str;
+    EnumToString(TTaskType, task_type, type_str);
+    auto req = std::make_unique<TAgentTaskRequest>(task);
+
+    DCHECK(req->__isset.priority && req->priority == TPriority::HIGH);
+    do {
+        std::lock_guard lock(s_task_signatures_mtx);
+        auto& set = s_task_signatures[task_type];
+        if (!set.contains(signature)) {
+            // If it doesn't exist, put it directly into the priority queue
+            add_task_count(*req, 1);
+            set.insert(signature);
+            std::lock_guard lock(_mtx);
+            _high_prior_queue.push_back(std::move(req));
+            _high_prior_condv.notify_one();
+            _normal_condv.notify_one();
+            break;
+        } else {
+            std::lock_guard lock(_mtx);
+            for (auto it = _normal_queue.begin(); it != _normal_queue.end();) {
+                // If it exists in the normal queue, cancel the task in the 
normal queue
+                if ((*it)->signature == signature) {
+                    _normal_queue.erase(it);                     // cancel the 
original task
+                    _high_prior_queue.push_back(std::move(req)); // add the 
new task to the queue
+                    _high_prior_condv.notify_one();
+                    _normal_condv.notify_one();
+                    break;
+                } else {
+                    ++it; // doesn't meet the condition, continue to the next 
one
+                }
+            }
+            // If it exists in the high priority queue, no operation is needed
+            LOG_INFO("task has already existed in high prior 
queue.").tag("signature", signature);
+        }
+    } while (false);
+
+    // Set the receiving time of task so that we can determine whether it is 
timed out later
+    (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
+
+    LOG_INFO("successfully submit task").tag("type", 
type_str).tag("signature", signature);
+    return Status::OK();
+}
+
 void PriorTaskWorkerPool::normal_loop() {
     while (true) {
         std::unique_ptr<TAgentTaskRequest> req;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 8d9be32b3dc..b64d84fb5c4 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -89,6 +89,8 @@ public:
 
     Status submit_task(const TAgentTaskRequest& task) override;
 
+    Status submit_high_prior_and_cancel_low(const TAgentTaskRequest& task);
+
 private:
     void normal_loop();
 
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index dee3b83b6b7..57aa4949809 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1544,6 +1544,10 @@ DEFINE_mInt32(segments_key_bounds_truncation_threshold, 
"-1");
 // ATTENTION: for test only, use random segments key bounds truncation 
threshold every time
 DEFINE_mBool(random_segments_key_bounds_truncation, "false");
 
+DEFINE_mBool(enable_auto_clone_on_compaction_missing_version, "false");
+
+DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4b50066fac3..16b2dece5c5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1620,6 +1620,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold);
 // ATTENTION: for test only, use random segments key bounds truncation 
threshold every time
 DECLARE_mBool(random_segments_key_bounds_truncation);
 
+DECLARE_mBool(enable_auto_clone_on_compaction_missing_version);
+
+DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 73fe179c2ce..2a2fdb51eb2 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -18,10 +18,13 @@
 #include "olap/cumulative_compaction.h"
 
 #include <cpp/sync_point.h>
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/Types_types.h>
 
 #include <memory>
 #include <mutex>
 #include <ostream>
+#include <vector>
 
 #include "common/config.h"
 #include "common/logging.h"
@@ -29,7 +32,9 @@
 #include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset_meta.h"
+#include "olap/storage_engine.h"
 #include "olap/tablet.h"
+#include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "util/doris_metrics.h"
 #include "util/time.h"
@@ -191,6 +196,24 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
                      << " first missed version prev rowset verison=" << 
missing_versions[0]
                      << ", first missed version next rowset version=" << 
missing_versions[1]
                      << ", tablet=" << _tablet->tablet_id();
+        if (config::enable_auto_clone_on_compaction_missing_version) {
+            LOG_INFO("cumulative compaction submit missing rowset clone task.")
+                    .tag("tablet_id", _tablet->tablet_id())
+                    .tag("version", missing_versions.back().first)
+                    .tag("replica_id", tablet()->replica_id())
+                    .tag("partition_id", _tablet->partition_id())
+                    .tag("table_id", _tablet->table_id());
+            Status st = _engine.submit_clone_task(tablet(), 
missing_versions.back().first);
+            if (!st) {
+                LOG_WARNING("cumulative compaction failed to submit missing 
rowset clone task.")
+                        .tag("st", st.to_string())
+                        .tag("tablet_id", _tablet->tablet_id())
+                        .tag("version", missing_versions.back().first)
+                        .tag("replica_id", tablet()->replica_id())
+                        .tag("partition_id", _tablet->partition_id())
+                        .tag("table_id", _tablet->table_id());
+            }
+        }
     }
 
     int64_t max_score = config::cumulative_compaction_max_deltas;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 6118b2a0e0b..d119e197d93 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -20,6 +20,8 @@
 // IWYU pragma: no_include <bthread/errno.h>
 #include <fmt/format.h>
 #include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
 #include <rapidjson/document.h>
 #include <rapidjson/encodings.h>
 #include <rapidjson/prettywriter.h>
@@ -37,6 +39,7 @@
 #include <cstring>
 #include <filesystem>
 #include <iterator>
+#include <memory>
 #include <mutex>
 #include <ostream>
 #include <set>
@@ -49,6 +52,7 @@
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
+#include "gen_cpp/FrontendService.h"
 #include "gutil/strings/substitute.h"
 #include "io/fs/local_file_system.h"
 #include "olap/binlog.h"
@@ -67,6 +71,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
 #include "olap/txn_manager.h"
+#include "runtime/client_cache.h"
 #include "runtime/stream_load/stream_load_recorder.h"
 #include "util/doris_metrics.h"
 #include "util/mem_info.h"
@@ -74,8 +79,10 @@
 #include "util/stopwatch.hpp"
 #include "util/thread.h"
 #include "util/threadpool.h"
+#include "util/thrift_rpc_helper.h"
 #include "util/uid_util.h"
 #include "util/work_thread_pool.hpp"
+#include "vec/common/assert_cast.h"
 
 using std::filesystem::directory_iterator;
 using std::filesystem::path;
@@ -1505,6 +1512,79 @@ bool StorageEngine::get_peer_replica_info(int64_t 
tablet_id, TReplicaInfo* repli
     return false;
 }
 
+bool StorageEngine::get_peers_replica_backends(int64_t tablet_id, 
std::vector<TBackend>* backends) {
+    TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
+    if (tablet == nullptr) {
+        LOG(WARNING) << "tablet is no longer exist: tablet_id=" << tablet_id;
+        return false;
+    }
+    int64_t cur_time = UnixMillis();
+    if (cur_time - _last_get_peers_replica_backends_time_ms < 10000) {
+        LOG_WARNING("failed to get peers replica backens.")
+                .tag("last time", _last_get_peers_replica_backends_time_ms)
+                .tag("cur time", cur_time);
+        return false;
+    }
+    LOG_INFO("start get peers replica backends info.").tag("tablet id", 
tablet_id);
+    ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
+    if (cluster_info == nullptr) {
+        LOG(WARNING) << "Have not get FE Master heartbeat yet";
+        return false;
+    }
+    TNetworkAddress master_addr = cluster_info->master_fe_addr;
+    if (master_addr.hostname.empty() || master_addr.port == 0) {
+        LOG(WARNING) << "Have not get FE Master heartbeat yet";
+        return false;
+    }
+    TGetTabletReplicaInfosRequest request;
+    TGetTabletReplicaInfosResult result;
+    request.tablet_ids.emplace_back(tablet_id);
+    Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->getTabletReplicaInfos(result, request);
+            });
+
+    if (!rpc_st.ok()) {
+        LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc 
failure, "
+                        "tablet id: "
+                     << tablet_id;
+        return false;
+    }
+    std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
+    if (result.tablet_replica_infos.contains(tablet_id)) {
+        std::vector<TReplicaInfo> reps = 
result.tablet_replica_infos[tablet_id];
+        DCHECK_NE(reps.size(), 0);
+        for (const auto& rep : reps) {
+            if (rep.replica_id != tablet->replica_id()) {
+                TBackend backend;
+                backend.__set_host(rep.host);
+                backend.__set_be_port(rep.be_port);
+                backend.__set_http_port(rep.http_port);
+                backend.__set_brpc_port(rep.brpc_port);
+                if (rep.__isset.is_alive) {
+                    backend.__set_is_alive(rep.is_alive);
+                }
+                if (rep.__isset.backend_id) {
+                    backend.__set_id(rep.backend_id);
+                }
+                backends->emplace_back(backend);
+                std::stringstream backend_string;
+                backend.printTo(backend_string);
+                LOG_INFO("get 1 peer replica backend info.")
+                        .tag("tablet id", tablet_id)
+                        .tag("backend info", backend_string.str());
+            }
+        }
+        _last_get_peers_replica_backends_time_ms = UnixMillis();
+        LOG_INFO("succeed get peers replica backends info.")
+                .tag("tablet id", tablet_id)
+                .tag("replica num", backends->size());
+        return true;
+    }
+    return false;
+}
+
 bool StorageEngine::should_fetch_from_peer(int64_t tablet_id) {
 #ifdef BE_TEST
     if (tablet_id % 2 == 0) {
@@ -1619,6 +1699,36 @@ Status StorageEngine::_persist_broken_paths() {
     return Status::OK();
 }
 
+Status StorageEngine::submit_clone_task(Tablet* tablet, int64_t version) {
+    std::vector<TBackend> backends;
+    if (!get_peers_replica_backends(tablet->tablet_id(), &backends)) {
+        LOG(WARNING) << tablet->tablet_id() << " tablet doesn't have peer 
replica backends";
+        return Status::InternalError("");
+    }
+    TAgentTaskRequest task;
+    TCloneReq req;
+    req.__set_tablet_id(tablet->tablet_id());
+    req.__set_schema_hash(tablet->schema_hash());
+    req.__set_src_backends(backends);
+    req.__set_version(version);
+    req.__set_replica_id(tablet->replica_id());
+    req.__set_partition_id(tablet->partition_id());
+    req.__set_table_id(tablet->table_id());
+    task.__set_task_type(TTaskType::CLONE);
+    task.__set_clone_req(req);
+    task.__set_priority(TPriority::HIGH);
+    task.__set_signature(tablet->tablet_id());
+    LOG_INFO("BE start to submit missing rowset clone task.")
+            .tag("tablet_id", tablet->tablet_id())
+            .tag("version", version)
+            .tag("replica_id", tablet->replica_id())
+            .tag("partition_id", tablet->partition_id())
+            .tag("table_id", tablet->table_id());
+    
RETURN_IF_ERROR(assert_cast<PriorTaskWorkerPool*>(workers->at(TTaskType::CLONE).get())
+                            ->submit_high_prior_and_cancel_low(task));
+    return Status::OK();
+}
+
 int CreateTabletRRIdxCache::get_index(const std::string& key) {
     auto* lru_handle = lookup(key);
     if (lru_handle) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 72fd17d02e8..c5b9b193162 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -37,6 +37,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include "agent/task_worker_pool.h"
 #include "common/config.h"
 #include "common/status.h"
 #include "gutil/ref_counted.h"
@@ -313,6 +314,8 @@ public:
 
     bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, 
std::string* token);
 
+    bool get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* 
backends);
+
     bool should_fetch_from_peer(int64_t tablet_id);
 
     const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
@@ -343,6 +346,10 @@ public:
 
     std::set<string> get_broken_paths() { return _broken_paths; }
 
+    Status submit_clone_task(Tablet* tablet, int64_t version);
+
+    std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;
+
 private:
     // Instance should be inited from `static open()`
     // MUST NOT be called in other circumstances.
@@ -564,6 +571,8 @@ private:
 
     // thread to check tablet delete bitmap count tasks
     scoped_refptr<Thread> _check_delete_bitmap_score_thread;
+
+    int64_t _last_get_peers_replica_backends_time_ms {0};
 };
 
 // lru cache for create tabelt round robin in disks
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 6a9e66f1d38..e2f915f6cda 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -38,6 +38,7 @@
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
+#include <vector>
 
 #include "common/config.h"
 #include "common/logging.h"
@@ -989,5 +990,4 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
     return tablet->revise_tablet_meta(to_add, to_delete, false);
     // TODO(plat1ko): write cooldown meta to remote if this replica is 
cooldown replica
 }
-
 } // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 2dcc1723b71..6c37e55da75 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -32,6 +32,7 @@
 #include <unordered_map>
 #include <utility>
 
+#include "cloud/config.h"
 #include "common/logging.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
@@ -216,6 +217,25 @@ Status EnginePublishVersionTask::execute() {
                         continue;
                     }
                     auto handle_version_not_continuous = [&]() {
+                        if 
(config::enable_auto_clone_on_mow_publish_missing_version) {
+                            LOG_INFO("mow publish submit missing rowset clone 
task.")
+                                    .tag("tablet_id", tablet->tablet_id())
+                                    .tag("version", version.first - 1)
+                                    .tag("replica_id", tablet->replica_id())
+                                    .tag("partition_id", 
tablet->partition_id())
+                                    .tag("table_id", tablet->table_id());
+                            Status st = 
_engine.submit_clone_task(tablet.get(), version.first - 1);
+                            if (!st) {
+                                LOG_WARNING(
+                                        "mow publish failed to submit missing 
rowset clone task.")
+                                        .tag("st", st.to_string())
+                                        .tag("tablet_id", tablet->tablet_id())
+                                        .tag("version", version.first - 1)
+                                        .tag("replica_id", 
tablet->replica_id())
+                                        .tag("partition_id", 
tablet->partition_id())
+                                        .tag("table_id", tablet->table_id());
+                            }
+                        }
                         add_error_tablet_id(tablet_info.tablet_id);
                         // When there are too many missing versions, do not 
directly retry the
                         // publish and handle it through async publish.
diff --git a/be/src/util/blocking_priority_queue.hpp 
b/be/src/util/blocking_priority_queue.hpp
index bfc1c34e8f1..43fe1e4df47 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -22,6 +22,7 @@
 
 #include <unistd.h>
 
+#include <cassert>
 #include <condition_variable>
 #include <mutex>
 #include <queue>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 2dd0a6a746e..9a034952ca1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2818,6 +2818,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     replicaInfo.setBePort(backend.getBePort());
                     replicaInfo.setHttpPort(backend.getHttpPort());
                     replicaInfo.setBrpcPort(backend.getBrpcPort());
+                    replicaInfo.setIsAlive(backend.isAlive());
+                    replicaInfo.setBackendId(backend.getId());
                     replicaInfo.setReplicaId(replica.getId());
                     replicaInfos.add(replicaInfo);
                 }
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 80683388fa2..397f1e051ea 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -671,6 +671,8 @@ struct TReplicaInfo {
     3: required TPort  http_port
     4: required TPort  brpc_port
     5: required TReplicaId replica_id
+    6: optional bool is_alive
+    7: optional i64 backend_id
 }
 
 struct TResourceInfo {
diff --git 
a/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy
new file mode 100644
index 00000000000..a7f060a1108
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.NoHttpResponseException
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite('test_compaction_clone_missing_rowset_fault_injection', 'docker') {
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.enableDebugPoints()
+    options.feConfigs += [ "disable_tablet_scheduler=true" ]
+    options.beConfigs += [ 
"enable_auto_clone_on_compaction_missing_version=true" ]
+    options.beNum = 3
+    docker(options) {
+
+        def injectBe = null
+        def normalBe = null
+        def backends = sql_return_maparray('show backends')
+
+        injectBe = backends[0]
+        assertNotNull(injectBe)
+        normalBe = backends[1]
+        assertNotNull(normalBe)
+
+        try {
+            def tableName = "test_compaction_clone_missing_rowset"
+            sql """ DROP TABLE IF EXISTS ${tableName} force"""
+            sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    `k` int ,
+                    `v` int ,
+                ) engine=olap
+                DUPLICATE KEY(k)
+                DISTRIBUTED BY HASH(k)
+                BUCKETS 1
+                properties(
+                    "replication_num" = "3",
+                    "disable_auto_compaction" = "true")
+                """
+            sql """ INSERT INTO ${tableName} VALUES (1,0)"""
+            DebugPoint.enableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, 
"EnginePublishVersionTask.finish.random", [percent:"1.0"])
+            sql """ INSERT INTO ${tableName} VALUES (2,0)"""
+            sql """ INSERT INTO ${tableName} VALUES (3,0)"""
+            sql """ INSERT INTO ${tableName} VALUES (4,0)"""
+            DebugPoint.disableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, 
"EnginePublishVersionTask.finish.random")
+            sql """ INSERT INTO ${tableName} VALUES (5,0)"""
+
+            def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+            def tabletId = array[0].TabletId
+
+            // 1st check rowsets
+            logger.info("1st show:" + tabletId)
+            def (code, out, err) = be_show_tablet_status(injectBe.Host, 
injectBe.HttpPort, tabletId)
+            logger.info("1st show: code=" + code + ", out=" + out + ", err=" + 
err)
+            assertTrue(out.contains("[0-1]"))
+            assertTrue(out.contains("[2-2]"))
+            // missing rowset [3-5]
+            assertTrue(out.contains("[3-5]"))
+            assertTrue(out.contains("[6-6]"))
+
+            logger.info("1st run cumu compaction:" + tabletId)
+            (code, out, err) = be_run_cumulative_compaction(injectBe.Host, 
injectBe.HttpPort, tabletId)
+            logger.info("1st Run cumu compaction: code=" + code + ", out=" + 
out + ", err=" + err)
+
+            sleep(10000)
+
+            // 2nd check rowsets
+            logger.info("2nd show:" + tabletId)
+            (code, out, err) = be_show_tablet_status(injectBe.Host, 
injectBe.HttpPort, tabletId)
+            logger.info("2nd show: code=" + code + ", out=" + out + ", err=" + 
err)
+            assertTrue(out.contains("[0-1]"))
+            assertTrue(out.contains("[2-2]"))
+            assertTrue(out.contains("[3-3]"))
+            assertTrue(out.contains("[4-4]"))
+            assertTrue(out.contains("[5-5]"))
+            assertTrue(out.contains("[6-6]"))
+
+            logger.info("2nd cumu compaction:" + tabletId)
+            (code, out, err) = be_run_cumulative_compaction(injectBe.Host, 
injectBe.HttpPort, tabletId)
+            logger.info("2nd cumu compaction: code=" + code + ", out=" + out + 
", err=" + err)
+
+            // check rowsets
+            logger.info("3rd show:" + tabletId)
+            (code, out, err) = be_show_tablet_status(injectBe.Host, 
injectBe.HttpPort, tabletId)
+            logger.info("3rd show: code=" + code + ", out=" + out + ", err=" + 
err)
+            assertTrue(out.contains("[0-1]"))
+            assertTrue(out.contains("[2-2]"))
+            assertTrue(out.contains("[3-6]"))
+
+        } finally {
+            if (injectBe != null) {
+                DebugPoint.disableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, 
"EnginePublishVersionTask.finish.random")
+            }
+        }
+    }
+}
diff --git 
a/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy
 
b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy
new file mode 100644
index 00000000000..14f0073f5c8
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.NoHttpResponseException
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite('test_mow_publish_clone_missing_rowset_fault_injection', 'docker') {
+
+    def set_be_param = { paramName, paramValue, beIp, bePort ->
+        def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
paramValue))
+        assertTrue(out.contains("OK"))
+    }
+
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.enableDebugPoints()
+    options.feConfigs += [ "disable_tablet_scheduler=true" ]
+    options.beConfigs += [ 
"enable_auto_clone_on_mow_publish_missing_version=false" ]
+    options.beNum = 3
+    docker(options) {
+
+        def injectBe = null
+        def normalBe = null
+        def backends = sql_return_maparray('show backends')
+
+        injectBe = backends[0]
+        assertNotNull(injectBe)
+        normalBe = backends[1]
+        assertNotNull(normalBe)
+
+        try {
+            def tableName = "test_mow_publish_clone_missing_rowset"
+            sql """ DROP TABLE IF EXISTS ${tableName} force"""
+            sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    `k` int ,
+                    `v` int ,
+                ) engine=olap
+                UNIQUE KEY(k)
+                DISTRIBUTED BY HASH(k)
+                BUCKETS 1
+                properties(
+                    "replication_num" = "3",
+                    "disable_auto_compaction" = "true")
+                """
+            sql """ INSERT INTO ${tableName} VALUES (1,0)"""
+            DebugPoint.enableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, 
"EnginePublishVersionTask.finish.random", [percent:"1.0"])
+            sql """ INSERT INTO ${tableName} VALUES (2,0)"""
+            DebugPoint.disableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, 
"EnginePublishVersionTask.finish.random")
+            sql """ INSERT INTO ${tableName} VALUES (3,0)"""
+            sql """ INSERT INTO ${tableName} VALUES (4,0)"""
+            sql """ INSERT INTO ${tableName} VALUES (5,0)"""
+
+            def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+            def tabletId = array[0].TabletId
+
+            // normal be check rowsets
+            logger.info("normal be show:" + tabletId)
+            def (code, out, err) = be_show_tablet_status(normalBe.Host, 
normalBe.HttpPort, tabletId)
+            logger.info("normal be show: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertTrue(out.contains("[0-1]"))
+            assertTrue(out.contains("[2-2]"))
+            assertTrue(out.contains("[3-3]"))
+            assertTrue(out.contains("[4-4]"))
+            assertTrue(out.contains("[5-5]"))
+            assertTrue(out.contains("[6-6]"))
+
+            // 1st inject be check rowsets
+            logger.info("1st inject be show:" + tabletId)
+            (code, out, err) = be_show_tablet_status(injectBe.Host, 
injectBe.HttpPort, tabletId)
+            logger.info("1st inject be show: code=" + code + ", out=" + out + 
", err=" + err)
+            assertTrue(out.contains("[0-1]"))
+            assertTrue(out.contains("[2-2]"))
+            assertFalse(out.contains("[3-3]"))
+            assertFalse(out.contains("[4-4]"))
+            assertFalse(out.contains("[5-5]"))
+            assertFalse(out.contains("[6-6]"))
+
+            set_be_param("enable_auto_clone_on_mow_publish_missing_version", 
"true", injectBe.Host, injectBe.HttpPort);
+            Thread.sleep(10000)
+            // submit clone task
+            sql """ INSERT INTO ${tableName} VALUES (6,0)"""
+
+            sleep(10000)
+
+            // 2nd inject be check rowsets
+            logger.info("2nd inject be show:" + tabletId)
+            (code, out, err) = be_show_tablet_status(injectBe.Host, 
injectBe.HttpPort, tabletId)
+            logger.info("2nd inject be show: code=" + code + ", out=" + out + 
", err=" + err)
+            assertTrue(out.contains("[0-1]"))
+            assertTrue(out.contains("[2-2]"))
+            assertTrue(out.contains("[3-3]"))
+            assertTrue(out.contains("[4-4]"))
+            assertTrue(out.contains("[5-5]"))
+            assertTrue(out.contains("[6-6]"))
+            assertTrue(out.contains("[7-7]"))
+
+            sql """ INSERT INTO ${tableName} VALUES (7,0)"""
+
+            // 3rd inject be check rowsets
+            logger.info("3rd inject be show:" + tabletId)
+            (code, out, err) = be_show_tablet_status(injectBe.Host, 
injectBe.HttpPort, tabletId)
+            logger.info("3rd inject be show: code=" + code + ", out=" + out + 
", err=" + err)
+            assertTrue(out.contains("[0-1]"))
+            assertTrue(out.contains("[2-2]"))
+            assertTrue(out.contains("[3-3]"))
+            assertTrue(out.contains("[4-4]"))
+            assertTrue(out.contains("[5-5]"))
+            assertTrue(out.contains("[6-6]"))
+            assertTrue(out.contains("[7-7]"))
+            assertTrue(out.contains("[8-8]"))
+        } finally {
+            if (injectBe != null) {
+                DebugPoint.disableDebugPoint(injectBe.Host, 
injectBe.HttpPort.toInteger(), NodeType.BE, 
"EnginePublishVersionTask.finish.random")
+            }
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to