This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 783c7d38658ad5169868a2d8028b23f2f73da07a
Author: plat1ko <[email protected]>
AuthorDate: Wed Feb 22 16:21:11 2023 +0800

    [fix](replica) Fix inconsistent replica id between BE and FE in corner case 
of tablet rebalance (#16889)
---
 be/src/olap/tablet_manager.cpp                          | 14 +++++---------
 be/src/olap/tablet_manager.h                            |  3 ++-
 be/src/olap/task/engine_clone_task.cpp                  | 17 +++++++++++++----
 .../java/org/apache/doris/master/ReportHandler.java     |  9 +++++++--
 4 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 9d111dd697..36b0eb64a9 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -436,8 +436,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, 
TReplicaId replica_id,
     auto& shard = _get_tablets_shard(tablet_id);
     std::lock_guard wrlock(shard.lock);
     if (shard.tablets_under_clone.count(tablet_id) > 0) {
-        LOG(INFO) << "tablet " << tablet_id << " is under clone, skip drop 
task";
-        return Status::Aborted("aborted");
+        return Status::Aborted("tablet {} is under clone, skip drop task", 
tablet_id);
     }
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     return _drop_tablet_unlocked(tablet_id, replica_id, false, 
is_drop_table_or_partition);
@@ -459,12 +458,9 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId 
tablet_id, TReplicaId repl
     // We should compare replica id to avoid dropping new cloned tablet.
     // Iff request replica id is 0, FE may be an older release, then we drop 
this tablet as before.
     if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) {
-        LOG(WARNING) << "fail to drop tablet because replica id not match. "
-                     << "tablet_id=" << tablet_id << ", replica_id=" << 
to_drop_tablet->replica_id()
-                     << ", request replica_id=" << replica_id;
-        return Status::Aborted("aborted");
+        return Status::Aborted("replica_id not match({} vs {})", 
to_drop_tablet->replica_id(),
+                               replica_id);
     }
-
     _remove_tablet_from_partition(to_drop_tablet);
     tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
     tablet_map.erase(tablet_id);
@@ -1043,10 +1039,10 @@ Status TabletManager::start_trash_sweep() {
     return Status::OK();
 } // start_trash_sweep
 
-void TabletManager::register_clone_tablet(int64_t tablet_id) {
+bool TabletManager::register_clone_tablet(int64_t tablet_id) {
     tablets_shard& shard = _get_tablets_shard(tablet_id);
     std::lock_guard<std::shared_mutex> wrlock(shard.lock);
-    shard.tablets_under_clone.insert(tablet_id);
+    return shard.tablets_under_clone.insert(tablet_id).second;
 }
 
 void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index bbb298cae2..b01a5141d5 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -133,7 +133,8 @@ public:
 
     void obtain_specific_quantity_tablets(std::vector<TabletInfo>& 
tablets_info, int64_t num);
 
-    void register_clone_tablet(int64_t tablet_id);
+    // return `true` if register success
+    bool register_clone_tablet(int64_t tablet_id);
     void unregister_clone_tablet(int64_t tablet_id);
 
     void get_tablets_distribution_on_different_disks(
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 4bb86501b0..7550366412 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -63,7 +63,9 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, 
const TMasterInfo&
 Status EngineCloneTask::execute() {
     // register the tablet to avoid it is deleted by gc thread during clone 
process
     SCOPED_ATTACH_TASK(_mem_tracker);
-    
StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id);
+    if 
(StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id))
 {
+        return Status::InternalError("tablet {} is under clone", 
_clone_req.tablet_id);
+    }
     Status st = _do_clone();
     
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
     return st;
@@ -81,6 +83,13 @@ Status EngineCloneTask::_do_clone() {
     std::vector<Version> missed_versions;
     // try to repair a tablet with missing version
     if (tablet != nullptr) {
+        if (tablet->replica_id() != _clone_req.replica_id) {
+            // `tablet` may be a dropped replica in FE, e.g: BE1 migrates 
replica of tablet_1 to BE2,
+            // but before BE1 drop this replica, another new replica of 
tablet_1 is migrated to BE1.
+            // If we allow to clone success on dropped replica, replica id may 
never be consistent between FE and BE.
+            return Status::InternalError("replica_id not match({} vs {})", 
tablet->replica_id(),
+                                         _clone_req.replica_id);
+        }
         std::shared_lock migration_rlock(tablet->get_migration_lock(), 
std::try_to_lock);
         if (!migration_rlock.owns_lock()) {
             return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR);
@@ -96,7 +105,7 @@ Status EngineCloneTask::_do_clone() {
         // completed. Or remote be will just return header not the rowset 
files. clone will failed.
         if (missed_versions.empty()) {
             LOG(INFO) << "missed version size = 0, skip clone and return 
success. tablet_id="
-                      << _clone_req.tablet_id;
+                      << _clone_req.tablet_id << " replica_id=" << 
_clone_req.replica_id;
             _set_tablet_info(is_new_tablet);
             return Status::OK();
         }
@@ -104,7 +113,8 @@ Status EngineCloneTask::_do_clone() {
         LOG(INFO) << "clone to existed tablet. missed_versions_size=" << 
missed_versions.size()
                   << ", allow_incremental_clone=" << allow_incremental_clone
                   << ", signature=" << _signature << ", tablet_id=" << 
_clone_req.tablet_id
-                  << ", committed_version=" << _clone_req.committed_version;
+                  << ", committed_version=" << _clone_req.committed_version
+                  << ", replica_id=" << _clone_req.replica_id;
 
         // try to download missing version from src backend.
         // if tablet on src backend does not contains missing version, it will 
download all versions,
@@ -112,7 +122,6 @@ Status EngineCloneTask::_do_clone() {
         RETURN_IF_ERROR(_make_and_download_snapshots(*(tablet->data_dir()), 
local_data_path,
                                                      &src_host, 
&src_file_path, missed_versions,
                                                      
&allow_incremental_clone));
-
         RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, 
_clone_req.committed_version,
                                       allow_incremental_clone));
     } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index a38b91ff78..6741e3bfcf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -921,6 +921,12 @@ public class ReportHandler extends Daemon {
                 throw new MetaNotFoundException("tablet[" + tabletId + "] does 
not exist");
             }
 
+            // check replica id
+            long replicaId = backendTabletInfo.getReplicaId();
+            if (replicaId <= 0) {
+                throw new MetaNotFoundException("replica id is invalid, tablet 
id: " + tabletId);
+            }
+
             long visibleVersion = partition.getVisibleVersion();
 
             // check replica version
@@ -964,8 +970,7 @@ public class ReportHandler extends Daemon {
                 } else if (version < partition.getCommittedVersion()) {
                     lastFailedVersion = partition.getCommittedVersion();
                 }
-
-                long replicaId = Env.getCurrentEnv().getNextId();
+                // use replicaId reported by BE to maintain replica meta 
consistent between FE and BE
                 Replica replica = new Replica(replicaId, backendId, version, 
schemaHash,
                         dataSize, remoteDataSize, rowCount, 
ReplicaState.NORMAL,
                         lastFailedVersion, version);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to