This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 21026a50edb [improvement](tablet clone) fix balanced new replica will
be removed when load txn continuously (#25061)
21026a50edb is described below
commit 21026a50edb6d3db4aa2db205eac295f02db380e
Author: yujun <[email protected]>
AuthorDate: Sun Oct 8 19:03:29 2023 +0800
[improvement](tablet clone) fix balanced new replica will be removed when
load txn continuously (#25061)
---
.../main/java/org/apache/doris/common/Config.java | 12 +++
.../java/org/apache/doris/catalog/Replica.java | 93 +++++++++++++++++++---
.../org/apache/doris/clone/TabletSchedCtx.java | 74 ++++++++++++-----
3 files changed, 150 insertions(+), 29 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bffa3e55977..b9de274eada 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -932,6 +932,18 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long tablet_repair_delay_factor_second = 60;
+ /**
+ * clone a tablet, further repair timeout.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static long tablet_further_repair_timeout_second = 20 * 60;
+
+ /**
+ * clone a tablet, further repair max times.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int tablet_further_repair_max_times = 5;
+
/**
* the default slot number per path for hdd in tablet scheduler
* TODO(cmy): remove this config and dynamically adjust it by clone task
statistic
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index c76dc5e0ee3..e55eab89392 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -17,6 +17,7 @@
package org.apache.doris.catalog;
+import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.thrift.TUniqueId;
@@ -114,21 +115,25 @@ public class Replica implements Writable {
private long cooldownTerm = -1;
/*
- * If set to true, with means this replica need to be repaired. explicitly.
* This can happen when this replica is created by a balance clone task,
and
* when task finished, the version of this replica is behind the
partition's visible version.
* So this replica need a further repair.
* If we do not do this, this replica will be treated as version stale,
and will be removed,
* so that the balance task is failed, which is unexpected.
*
- * furtherRepairSetTime set alone with needFurtherRepair.
+ * furtherRepairSetTime and leftFurtherRepairCount are set alone with
needFurtherRepair.
* This is an insurance, in case that further repair task always fail. If
20 min passed
* since we set needFurtherRepair to true, the 'needFurtherRepair' will be
set to false.
*/
- private boolean needFurtherRepair = false;
private long furtherRepairSetTime = -1;
- private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; //
20min
+ private int leftFurtherRepairCount = 0;
+ // During full clone, the replica's state is CLONE, it will not load the
data.
+ // After full clone finished, even if the replica's version = partition's
visible version,
+ //
+ // notice: furtherRepairWatermarkTxnTd is used to clone a replica,
protected it from be removed.
+ //
+ private long furtherRepairWatermarkTxnTd = -1;
/* Decommission a backend B, steps are as follow:
* 1. wait peer backends catchup with B;
@@ -136,6 +141,8 @@ public class Replica implements Writable {
* 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId.
B can't load data now.
* 4. wait txn before postWatermarkTxnId finished, delete B.
*
+ * notice: preWatermarkTxnId and postWatermarkTxnId are used to delete
this replica.
+ *
*/
private long preWatermarkTxnId = -1;
private long postWatermarkTxnId = -1;
@@ -263,15 +270,35 @@ public class Replica implements Writable {
}
public boolean needFurtherRepair() {
- if (needFurtherRepair && System.currentTimeMillis() -
this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
- return true;
- }
- return false;
+ return leftFurtherRepairCount > 0
+ && System.currentTimeMillis() < furtherRepairSetTime
+ + Config.tablet_further_repair_timeout_second * 1000;
}
public void setNeedFurtherRepair(boolean needFurtherRepair) {
- this.needFurtherRepair = needFurtherRepair;
- this.furtherRepairSetTime = System.currentTimeMillis();
+ if (needFurtherRepair) {
+ furtherRepairSetTime = System.currentTimeMillis();
+ leftFurtherRepairCount = Config.tablet_further_repair_max_times;
+ } else {
+ leftFurtherRepairCount = 0;
+ furtherRepairSetTime = -1;
+ }
+ }
+
+ public void incrFurtherRepairCount() {
+ leftFurtherRepairCount--;
+ }
+
+ public int getLeftFurtherRepairCount() {
+ return leftFurtherRepairCount;
+ }
+
+ public long getFurtherRepairWatermarkTxnTd() {
+ return furtherRepairWatermarkTxnTd;
+ }
+
+ public void setFurtherRepairWatermarkTxnTd(long
furtherRepairWatermarkTxnTd) {
+ this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd;
}
// for compatibility
@@ -298,6 +325,42 @@ public class Replica implements Writable {
updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion,
dataSize, remoteDataSize, rowCount);
}
+ public synchronized void adminUpdateVersionInfo(Long version, Long
lastFailedVersion, Long lastSuccessVersion,
+ long updateTime) {
+ long oldLastFailedVersion = this.lastFailedVersion;
+ if (version != null) {
+ this.version = version;
+ }
+ if (lastSuccessVersion != null) {
+ this.lastSuccessVersion = lastSuccessVersion;
+ }
+ if (lastFailedVersion != null) {
+ if (this.lastFailedVersion < lastFailedVersion) {
+ this.lastFailedTimestamp = updateTime;
+ }
+ this.lastFailedVersion = lastFailedVersion;
+ }
+ if (this.lastFailedVersion < this.version) {
+ this.lastFailedVersion = -1;
+ this.lastFailedTimestamp = -1;
+ this.lastFailedVersionHash = 0;
+ }
+ if (this.lastFailedVersion > 0
+ && this.lastSuccessVersion > this.lastFailedVersion) {
+ this.lastSuccessVersion = this.version;
+ }
+ if (this.lastSuccessVersion < this.version) {
+ this.lastSuccessVersion = this.version;
+ }
+ if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) {
+ LOG.info("change replica last failed version from '< 0' to '> 0',
replica {}, old last failed version {}",
+ this, oldLastFailedVersion);
+ } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) {
+ LOG.info("change replica last failed version from '> 0' to '< 0',
replica {}, old last failed version {}",
+ this, oldLastFailedVersion);
+ }
+ }
+
/* last failed version: LFV
* last success version: LSV
* version: V
@@ -346,6 +409,8 @@ public class Replica implements Writable {
return;
}
+ long oldLastFailedVersion = this.lastFailedVersion;
+
this.version = newVersion;
this.dataSize = newDataSize;
this.remoteDataSize = newRemoteDataSize;
@@ -399,6 +464,14 @@ public class Replica implements Writable {
if (LOG.isDebugEnabled()) {
LOG.debug("after update {}", this.toString());
}
+
+ if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) {
+ LOG.info("change replica last failed version from '< 0' to '> 0',
replica {}, old last failed version {}",
+ this, oldLastFailedVersion);
+ } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) {
+ LOG.info("change replica last failed version from '> 0' to '< 0',
replica {}, old last failed version {}",
+ this, oldLastFailedVersion);
+ }
}
public synchronized void updateLastFailedVersion(long lastFailedVersion) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index db658271822..b4667f80696 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -188,6 +188,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private Replica tempSrcReplica = null;
private long destBackendId = -1;
private long destPathHash = -1;
+ private long destOldVersion = -1;
// for disk balance to set migration task's datadir
private String destPath = null;
private String errMsg = null;
@@ -912,12 +913,12 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// if this is a balance task, or this is a repair task with
// REPLICA_MISSING/REPLICA_RELOCATING,
// we create a new replica with state CLONE
- long replicaId = 0;
+ Replica replica = null;
if (tabletStatus == TabletStatus.REPLICA_MISSING
|| tabletStatus == TabletStatus.REPLICA_RELOCATING || type ==
Type.BALANCE
|| tabletStatus == TabletStatus.COLOCATE_MISMATCH
|| tabletStatus == TabletStatus.REPLICA_MISSING_FOR_TAG) {
- Replica cloneReplica = new Replica(
+ replica = new Replica(
Env.getCurrentEnv().getNextId(), destBackendId,
-1 /* version */, schemaHash,
-1 /* data size */, -1, -1 /* row count */,
@@ -925,15 +926,13 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
committedVersion, /* use committed version as last failed
version */
-1 /* last success version */);
- LOG.info("create clone task to make new replica, tabletId={},
replicaId={}", tabletId,
- cloneReplica.getId());
// addReplica() method will add this replica to tablet inverted
index too.
- tablet.addReplica(cloneReplica);
- replicaId = cloneReplica.getId();
- } else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
+ tablet.addReplica(replica);
+ } else {
+ // tabletStatus is VERSION_INCOMPLETE || NEED_FURTHER_REPAIR
Preconditions.checkState(type == Type.REPAIR, type);
// double check
- Replica replica = tablet.getReplicaByBackendId(destBackendId);
+ replica = tablet.getReplicaByBackendId(destBackendId);
if (replica == null) {
throw new SchedException(Status.SCHEDULE_FAILED, "dest replica
does not exist on BE " + destBackendId);
}
@@ -942,17 +941,18 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
throw new SchedException(Status.SCHEDULE_FAILED, "dest
replica's path hash is changed. "
+ "current: " + replica.getPathHash() + ", scheduled:
" + destPathHash);
}
- replicaId = replica.getId();
}
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(),
srcBe.getHttpPort());
TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(),
destBe.getHttpPort());
cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId,
partitionId, indexId, tabletId,
- replicaId, schemaHash, Lists.newArrayList(tSrcBe),
storageMedium,
+ replica.getId(), schemaHash, Lists.newArrayList(tSrcBe),
storageMedium,
visibleVersion, (int) (taskTimeoutMs / 1000));
+ destOldVersion = replica.getVersion();
cloneTask.setPathHash(srcPathHash, destPathHash);
- LOG.info("create clone task to repair replica, tabletId={},
replicaId={}", tabletId, replicaId);
+ LOG.info("create clone task to repair replica, tabletId={},
replica={}, visible version {}, tablet status {}",
+ tabletId, replica, visibleVersion, tabletStatus);
this.state = State.RUNNING;
return cloneTask;
@@ -1078,16 +1078,51 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
replica.setPathHash(reportedTablet.getPathHash());
}
- if (this.type == Type.BALANCE) {
- long partitionVisibleVersion = partition.getVisibleVersion();
- if (replica.getVersion() < partitionVisibleVersion) {
- // see comment 'needFurtherRepair' of Replica for
explanation.
- // no need to persist this info. If FE restart, just do it
again.
- replica.setNeedFurtherRepair(true);
+ if (type == Type.BALANCE) {
+ replica.setNeedFurtherRepair(true);
+ try {
+ long furtherRepairWatermarkTxnTd =
Env.getCurrentGlobalTransactionMgr()
+
.getTransactionIDGenerator().getNextTransactionId();
+
replica.setFurtherRepairWatermarkTxnTd(furtherRepairWatermarkTxnTd);
+ LOG.info("new replica {} of tablet {} set further repair
watermark id {}",
+ replica, tabletId, furtherRepairWatermarkTxnTd);
+ } catch (Exception e) {
+ LOG.warn("new replica {} set further repair watermark id
failed", replica, e);
}
- } else {
+ }
+
+ // isCatchup should check the txns during ReplicaState CLONE
finished.
+ // Because when replica's state = CLONE, it will not load txns.
+ // Even if this replica version = partition visible version, but
later if the txns during CLONE
+ // change from prepare to committed or visible, this replica will
be fall behind and be removed
+ // in REDUNDANT detection.
+ //
+ boolean isCatchup = false;
+ if (replica.getVersion() >= partition.getVisibleVersion() &&
replica.getLastFailedVersion() < 0) {
+ long furtherRepairWatermarkTxnTd =
replica.getFurtherRepairWatermarkTxnTd();
+ if (furtherRepairWatermarkTxnTd < 0) {
+ isCatchup = true;
+ } else {
+ try {
+ if
(Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
+ furtherRepairWatermarkTxnTd, dbId, tblId,
partitionId)) {
+ isCatchup = true;
+ LOG.info("new replica {} of tablet {} has catchup
with further repair watermark id {}",
+ replica, tabletId,
furtherRepairWatermarkTxnTd);
+ }
+ } catch (Exception e) {
+ isCatchup = true;
+ }
+ }
+ }
+
+ replica.incrFurtherRepairCount();
+ if (isCatchup || replica.getLeftFurtherRepairCount() <= 0) {
replica.setNeedFurtherRepair(false);
}
+ if (!replica.needFurtherRepair()) {
+ replica.setFurtherRepairWatermarkTxnTd(-1);
+ }
ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId,
tblId, partitionId, indexId,
tabletId, destBackendId, replica.getId(),
@@ -1109,7 +1144,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
state = State.FINISHED;
- LOG.info("clone finished: {}", this);
+ LOG.info("clone finished: {}, replica {}, replica old version {},
need further repair {}, is catchup {}",
+ this, replica, destOldVersion,
replica.needFurtherRepair(), isCatchup);
} finally {
olapTable.writeUnlock();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]