This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 21026a50edb [improvement](tablet clone) fix balanced new replica will 
be removed when load txn continuously (#25061)
21026a50edb is described below

commit 21026a50edb6d3db4aa2db205eac295f02db380e
Author: yujun <[email protected]>
AuthorDate: Sun Oct 8 19:03:29 2023 +0800

    [improvement](tablet clone) fix balanced new replica will be removed when 
load txn continuously (#25061)
---
 .../main/java/org/apache/doris/common/Config.java  | 12 +++
 .../java/org/apache/doris/catalog/Replica.java     | 93 +++++++++++++++++++---
 .../org/apache/doris/clone/TabletSchedCtx.java     | 74 ++++++++++++-----
 3 files changed, 150 insertions(+), 29 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bffa3e55977..b9de274eada 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -932,6 +932,18 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static long tablet_repair_delay_factor_second = 60;
 
+    /**
+     * clone a tablet, further repair timeout.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static long tablet_further_repair_timeout_second = 20 * 60;
+
+    /**
+     * clone a tablet, further repair max times.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int tablet_further_repair_max_times = 5;
+
     /**
      * the default slot number per path for hdd in tablet scheduler
      * TODO(cmy): remove this config and dynamically adjust it by clone task 
statistic
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index c76dc5e0ee3..e55eab89392 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.thrift.TUniqueId;
@@ -114,21 +115,25 @@ public class Replica implements Writable {
     private long cooldownTerm = -1;
 
     /*
-     * If set to true, with means this replica need to be repaired. explicitly.
      * This can happen when this replica is created by a balance clone task, 
and
      * when task finished, the version of this replica is behind the 
partition's visible version.
      * So this replica need a further repair.
      * If we do not do this, this replica will be treated as version stale, 
and will be removed,
      * so that the balance task is failed, which is unexpected.
      *
-     * furtherRepairSetTime set alone with needFurtherRepair.
+     * furtherRepairSetTime and leftFurtherRepairCount are set alone with 
needFurtherRepair.
      * This is an insurance, in case that further repair task always fail. If 
20 min passed
      * since we set needFurtherRepair to true, the 'needFurtherRepair' will be 
set to false.
      */
-    private boolean needFurtherRepair = false;
     private long furtherRepairSetTime = -1;
-    private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; // 
20min
+    private int leftFurtherRepairCount = 0;
 
+    // During full clone, the replica's state is CLONE, it will not load the 
data.
+    // After full clone finished, even if the replica's version = partition's 
visible version,
+    //
+    // notice: furtherRepairWatermarkTxnTd is used to clone a replica, 
protected it from be removed.
+    //
+    private long furtherRepairWatermarkTxnTd = -1;
 
     /* Decommission a backend B, steps are as follow:
      * 1. wait peer backends catchup with B;
@@ -136,6 +141,8 @@ public class Replica implements Writable {
      * 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. 
B can't load data now.
      * 4. wait txn before postWatermarkTxnId finished, delete B.
      *
+     * notice: preWatermarkTxnId and postWatermarkTxnId are used to delete 
this replica.
+     *
      */
     private long preWatermarkTxnId = -1;
     private long postWatermarkTxnId = -1;
@@ -263,15 +270,35 @@ public class Replica implements Writable {
     }
 
     public boolean needFurtherRepair() {
-        if (needFurtherRepair && System.currentTimeMillis() - 
this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
-            return true;
-        }
-        return false;
+        return leftFurtherRepairCount > 0
+                && System.currentTimeMillis() < furtherRepairSetTime
+                        + Config.tablet_further_repair_timeout_second * 1000;
     }
 
     public void setNeedFurtherRepair(boolean needFurtherRepair) {
-        this.needFurtherRepair = needFurtherRepair;
-        this.furtherRepairSetTime = System.currentTimeMillis();
+        if (needFurtherRepair) {
+            furtherRepairSetTime = System.currentTimeMillis();
+            leftFurtherRepairCount = Config.tablet_further_repair_max_times;
+        } else {
+            leftFurtherRepairCount = 0;
+            furtherRepairSetTime = -1;
+        }
+    }
+
+    public void incrFurtherRepairCount() {
+        leftFurtherRepairCount--;
+    }
+
+    public int getLeftFurtherRepairCount() {
+        return leftFurtherRepairCount;
+    }
+
+    public long getFurtherRepairWatermarkTxnTd() {
+        return furtherRepairWatermarkTxnTd;
+    }
+
+    public void setFurtherRepairWatermarkTxnTd(long 
furtherRepairWatermarkTxnTd) {
+        this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd;
     }
 
     // for compatibility
@@ -298,6 +325,42 @@ public class Replica implements Writable {
         updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, 
dataSize, remoteDataSize, rowCount);
     }
 
+    public synchronized void adminUpdateVersionInfo(Long version, Long 
lastFailedVersion, Long lastSuccessVersion,
+            long updateTime) {
+        long oldLastFailedVersion = this.lastFailedVersion;
+        if (version != null) {
+            this.version = version;
+        }
+        if (lastSuccessVersion != null) {
+            this.lastSuccessVersion = lastSuccessVersion;
+        }
+        if (lastFailedVersion != null) {
+            if (this.lastFailedVersion < lastFailedVersion) {
+                this.lastFailedTimestamp = updateTime;
+            }
+            this.lastFailedVersion = lastFailedVersion;
+        }
+        if (this.lastFailedVersion < this.version) {
+            this.lastFailedVersion = -1;
+            this.lastFailedTimestamp  = -1;
+            this.lastFailedVersionHash = 0;
+        }
+        if (this.lastFailedVersion > 0
+                && this.lastSuccessVersion > this.lastFailedVersion) {
+            this.lastSuccessVersion = this.version;
+        }
+        if (this.lastSuccessVersion < this.version) {
+            this.lastSuccessVersion = this.version;
+        }
+        if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) {
+            LOG.info("change replica last failed version from '< 0' to '> 0', 
replica {}, old last failed version {}",
+                    this, oldLastFailedVersion);
+        } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) {
+            LOG.info("change replica last failed version from '> 0' to '< 0', 
replica {}, old last failed version {}",
+                    this, oldLastFailedVersion);
+        }
+    }
+
     /* last failed version:  LFV
      * last success version: LSV
      * version:              V
@@ -346,6 +409,8 @@ public class Replica implements Writable {
             return;
         }
 
+        long oldLastFailedVersion = this.lastFailedVersion;
+
         this.version = newVersion;
         this.dataSize = newDataSize;
         this.remoteDataSize = newRemoteDataSize;
@@ -399,6 +464,14 @@ public class Replica implements Writable {
         if (LOG.isDebugEnabled()) {
             LOG.debug("after update {}", this.toString());
         }
+
+        if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) {
+            LOG.info("change replica last failed version from '< 0' to '> 0', 
replica {}, old last failed version {}",
+                    this, oldLastFailedVersion);
+        } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) {
+            LOG.info("change replica last failed version from '> 0' to '< 0', 
replica {}, old last failed version {}",
+                    this, oldLastFailedVersion);
+        }
     }
 
     public synchronized void updateLastFailedVersion(long lastFailedVersion) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index db658271822..b4667f80696 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -188,6 +188,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
     private Replica tempSrcReplica = null;
     private long destBackendId = -1;
     private long destPathHash = -1;
+    private long destOldVersion = -1;
     // for disk balance to set migration task's datadir
     private String destPath = null;
     private String errMsg = null;
@@ -912,12 +913,12 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         // if this is a balance task, or this is a repair task with
         // REPLICA_MISSING/REPLICA_RELOCATING,
         // we create a new replica with state CLONE
-        long replicaId = 0;
+        Replica replica = null;
         if (tabletStatus == TabletStatus.REPLICA_MISSING
                 || tabletStatus == TabletStatus.REPLICA_RELOCATING || type == 
Type.BALANCE
                 || tabletStatus == TabletStatus.COLOCATE_MISMATCH
                 || tabletStatus == TabletStatus.REPLICA_MISSING_FOR_TAG) {
-            Replica cloneReplica = new Replica(
+            replica = new Replica(
                     Env.getCurrentEnv().getNextId(), destBackendId,
                     -1 /* version */, schemaHash,
                     -1 /* data size */, -1, -1 /* row count */,
@@ -925,15 +926,13 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                     committedVersion, /* use committed version as last failed 
version */
                     -1 /* last success version */);
 
-            LOG.info("create clone task to make new replica, tabletId={}, 
replicaId={}", tabletId,
-                    cloneReplica.getId());
             // addReplica() method will add this replica to tablet inverted 
index too.
-            tablet.addReplica(cloneReplica);
-            replicaId = cloneReplica.getId();
-        } else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
+            tablet.addReplica(replica);
+        } else {
+            // tabletStatus is VERSION_INCOMPLETE || NEED_FURTHER_REPAIR
             Preconditions.checkState(type == Type.REPAIR, type);
             // double check
-            Replica replica = tablet.getReplicaByBackendId(destBackendId);
+            replica = tablet.getReplicaByBackendId(destBackendId);
             if (replica == null) {
                 throw new SchedException(Status.SCHEDULE_FAILED, "dest replica 
does not exist on BE " + destBackendId);
             }
@@ -942,17 +941,18 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 throw new SchedException(Status.SCHEDULE_FAILED, "dest 
replica's path hash is changed. "
                         + "current: " + replica.getPathHash() + ", scheduled: 
" + destPathHash);
             }
-            replicaId = replica.getId();
         }
 
         TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), 
srcBe.getHttpPort());
         TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(), 
destBe.getHttpPort());
 
         cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId, 
partitionId, indexId, tabletId,
-                replicaId, schemaHash, Lists.newArrayList(tSrcBe), 
storageMedium,
+                replica.getId(), schemaHash, Lists.newArrayList(tSrcBe), 
storageMedium,
                 visibleVersion, (int) (taskTimeoutMs / 1000));
+        destOldVersion = replica.getVersion();
         cloneTask.setPathHash(srcPathHash, destPathHash);
-        LOG.info("create clone task to repair replica, tabletId={}, 
replicaId={}", tabletId, replicaId);
+        LOG.info("create clone task to repair replica, tabletId={}, 
replica={}, visible version {}, tablet status {}",
+                tabletId, replica, visibleVersion, tabletStatus);
 
         this.state = State.RUNNING;
         return cloneTask;
@@ -1078,16 +1078,51 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 replica.setPathHash(reportedTablet.getPathHash());
             }
 
-            if (this.type == Type.BALANCE) {
-                long partitionVisibleVersion = partition.getVisibleVersion();
-                if (replica.getVersion() < partitionVisibleVersion) {
-                    // see comment 'needFurtherRepair' of Replica for 
explanation.
-                    // no need to persist this info. If FE restart, just do it 
again.
-                    replica.setNeedFurtherRepair(true);
+            if (type == Type.BALANCE) {
+                replica.setNeedFurtherRepair(true);
+                try {
+                    long furtherRepairWatermarkTxnTd = 
Env.getCurrentGlobalTransactionMgr()
+                            
.getTransactionIDGenerator().getNextTransactionId();
+                    
replica.setFurtherRepairWatermarkTxnTd(furtherRepairWatermarkTxnTd);
+                    LOG.info("new replica {} of tablet {} set further repair 
watermark id {}",
+                            replica, tabletId, furtherRepairWatermarkTxnTd);
+                } catch (Exception e) {
+                    LOG.warn("new replica {} set further repair watermark id 
failed", replica, e);
                 }
-            } else {
+            }
+
+            // isCatchup should check the txns during ReplicaState CLONE 
finished.
+            // Because when replica's state = CLONE, it will not load txns.
+            // Even if this replica version = partition visible version, but 
later if the txns during CLONE
+            // change from prepare to committed or visible, this replica will 
be fall behind and be removed
+            // in REDUNDANT detection.
+            //
+            boolean isCatchup = false;
+            if (replica.getVersion() >= partition.getVisibleVersion() && 
replica.getLastFailedVersion() < 0) {
+                long furtherRepairWatermarkTxnTd = 
replica.getFurtherRepairWatermarkTxnTd();
+                if (furtherRepairWatermarkTxnTd < 0) {
+                    isCatchup = true;
+                } else {
+                    try {
+                        if 
(Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
+                                furtherRepairWatermarkTxnTd, dbId, tblId, 
partitionId)) {
+                            isCatchup = true;
+                            LOG.info("new replica {} of tablet {} has catchup 
with further repair watermark id {}",
+                                    replica, tabletId, 
furtherRepairWatermarkTxnTd);
+                        }
+                    } catch (Exception e) {
+                        isCatchup = true;
+                    }
+                }
+            }
+
+            replica.incrFurtherRepairCount();
+            if (isCatchup || replica.getLeftFurtherRepairCount() <= 0) {
                 replica.setNeedFurtherRepair(false);
             }
+            if (!replica.needFurtherRepair()) {
+                replica.setFurtherRepairWatermarkTxnTd(-1);
+            }
 
             ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId, 
tblId, partitionId, indexId,
                     tabletId, destBackendId, replica.getId(),
@@ -1109,7 +1144,8 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
             }
 
             state = State.FINISHED;
-            LOG.info("clone finished: {}", this);
+            LOG.info("clone finished: {}, replica {}, replica old version {}, 
need further repair {}, is catchup {}",
+                    this, replica, destOldVersion, 
replica.needFurtherRepair(), isCatchup);
         } finally {
             olapTable.writeUnlock();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to