This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 461c4dfaae [fix](tablet clone) fix single replica load failed during
migration (#22077)
461c4dfaae is described below
commit 461c4dfaae3a6cb4b79e07df9f3bc955a2dc9458
Author: yujun <[email protected]>
AuthorDate: Thu Jul 27 20:38:03 2023 +0800
[fix](tablet clone) fix single replica load failed during migration (#22077)
---
.../java/org/apache/doris/catalog/Replica.java | 29 ++++++++---
.../main/java/org/apache/doris/catalog/Tablet.java | 21 +++-----
.../org/apache/doris/clone/TabletSchedCtx.java | 10 +++-
.../org/apache/doris/clone/TabletScheduler.java | 57 +++++++++++++++-------
.../doris/transaction/DatabaseTransactionMgr.java | 29 +++++++++++
.../doris/transaction/GlobalTransactionMgr.java | 13 +++++
.../apache/doris/transaction/TransactionState.java | 4 --
.../java/org/apache/doris/clone/RebalanceTest.java | 4 +-
8 files changed, 120 insertions(+), 47 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index ef32ee979b..c444cdf1db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -129,9 +129,16 @@ public class Replica implements Writable {
private long furtherRepairSetTime = -1;
private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; //
20min
- // if this watermarkTxnId is set, which means before deleting a replica,
- // we should ensure that all txns on this replicas are finished.
- private long watermarkTxnId = -1;
+
+ /* Decommission a backend B, steps are as follow:
+ * 1. wait peer backends catchup with B;
+ * 2. B change state to DECOMMISSION, set preWatermarkTxnId. B can load
data now.
+ * 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId.
B can't load data now.
+ * 4. wait txn before postWatermarkTxnId finished, delete B.
+ *
+ */
+ private long preWatermarkTxnId = -1;
+ private long postWatermarkTxnId = -1;
public Replica() {
}
@@ -568,12 +575,20 @@ public class Replica implements Writable {
}
}
- public void setWatermarkTxnId(long watermarkTxnId) {
- this.watermarkTxnId = watermarkTxnId;
+ public void setPreWatermarkTxnId(long preWatermarkTxnId) {
+ this.preWatermarkTxnId = preWatermarkTxnId;
+ }
+
+ public long getPreWatermarkTxnId() {
+ return preWatermarkTxnId;
+ }
+
+ public void setPostWatermarkTxnId(long postWatermarkTxnId) {
+ this.postWatermarkTxnId = postWatermarkTxnId;
}
- public long getWatermarkTxnId() {
- return watermarkTxnId;
+ public long getPostWatermarkTxnId() {
+ return postWatermarkTxnId;
}
public boolean isAlive() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 0e93a4dd0f..af83e1ba8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -211,19 +211,7 @@ public class Tablet extends MetaObject implements Writable
{
}
public List<Long> getNormalReplicaBackendIds() {
- List<Long> beIds = Lists.newArrayList();
- SystemInfoService infoService = Env.getCurrentSystemInfo();
- for (Replica replica : replicas) {
- if (replica.isBad()) {
- continue;
- }
-
- ReplicaState state = replica.getState();
- if (infoService.checkBackendAlive(replica.getBackendId()) &&
state.canLoad()) {
- beIds.add(replica.getBackendId());
- }
- }
- return beIds;
+ return Lists.newArrayList(getNormalReplicaBackendPathMap().keySet());
}
// return map of (BE id -> path hash) of normal replicas
@@ -232,12 +220,17 @@ public class Tablet extends MetaObject implements
Writable {
Multimap<Long, Long> map = HashMultimap.create();
SystemInfoService infoService = Env.getCurrentSystemInfo();
for (Replica replica : replicas) {
+ if (!infoService.checkBackendAlive(replica.getBackendId())) {
+ continue;
+ }
+
if (replica.isBad()) {
continue;
}
ReplicaState state = replica.getState();
- if (infoService.checkBackendLoadAvailable(replica.getBackendId())
&& state.canLoad()) {
+ if (state.canLoad()
+ || (state == ReplicaState.DECOMMISSION &&
replica.getPostWatermarkTxnId() < 0)) {
map.put(replica.getBackendId(), replica.getPathHash());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index a3734dc9e0..876b8d10fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -748,7 +748,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
//
// If we do not reset this replica state to NORMAL, the tablet's
health status will be in VERSION_INCOMPLETE
// forever, because the replica in the DECOMMISSION state will not
receive the load task.
- chosenReplica.setWatermarkTxnId(-1);
+ chosenReplica.setPreWatermarkTxnId(-1);
+ chosenReplica.setPostWatermarkTxnId(-1);
chosenReplica.setState(ReplicaState.NORMAL);
setDecommissionTime(-1);
LOG.info("choose replica {} on backend {} of tablet {} as dest
replica for version incomplete,"
@@ -1142,6 +1143,10 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60
* 1000L;
value += 5000L * (failedSchedCounter / 10);
+ if (schedFailedCode == SubCode.WAITING_DECOMMISSION) {
+ value += 5 * 1000L;
+ }
+
if (type == Type.BALANCE) {
value += 30 * 60 * 1000L;
}
@@ -1200,7 +1205,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// any intermediate state it set during the scheduling process.
if (replica.getState() == ReplicaState.DECOMMISSION) {
replica.setState(ReplicaState.NORMAL);
- replica.setWatermarkTxnId(-1);
+ replica.setPreWatermarkTxnId(-1);
+ replica.setPostWatermarkTxnId(-1);
LOG.debug("reset replica {} on backend {} of tablet {}
state from DECOMMISSION to NORMAL",
replica.getId(), replica.getBackendId(), tabletId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index ab50bb095b..de168634e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1065,7 +1065,7 @@ public class TabletScheduler extends MasterDaemon {
if (matchupReplicaCount <= 1) {
LOG.info("can not delete only one replica, tabletId = {} replicaId
= {}", tabletCtx.getTabletId(),
replica.getId());
- throw new SchedException(Status.FINISHED, "the only one latest
replia can not be dropped, tabletId = "
+ throw new SchedException(Status.UNRECOVERABLE, "the only one
latest replia can not be dropped, tabletId = "
+ tabletCtx.getTabletId() + ",
replicaId = " + replica.getId());
}
@@ -1080,25 +1080,46 @@ public class TabletScheduler extends MasterDaemon {
* If all are finished, which means this replica is
* safe to be deleted.
*/
- if (!force && !Config.enable_force_drop_redundant_replica &&
replica.getState().canLoad()
- && replica.getWatermarkTxnId() == -1 &&
!FeConstants.runningUnitTest) {
- long nextTxnId = Env.getCurrentGlobalTransactionMgr()
- .getTransactionIDGenerator().getNextTransactionId();
- replica.setWatermarkTxnId(nextTxnId);
- replica.setState(ReplicaState.DECOMMISSION);
- // set priority to normal because it may wait for a long time.
Remain it as VERY_HIGH may block other task.
- tabletCtx.setPriority(Priority.NORMAL);
- LOG.info("set replica {} on backend {} of tablet {} state to
DECOMMISSION due to reason {}",
- replica.getId(), replica.getBackendId(),
tabletCtx.getTabletId(), reason);
- throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_DECOMMISSION,
- "set watermark txn " + nextTxnId);
- } else if (replica.getState() == ReplicaState.DECOMMISSION &&
replica.getWatermarkTxnId() != -1) {
- long watermarkTxnId = replica.getWatermarkTxnId();
+ if (!force && !Config.enable_force_drop_redundant_replica
+ && !FeConstants.runningUnitTest
+ && (replica.getState().canLoad() || replica.getState() ==
ReplicaState.DECOMMISSION)) {
+ if (replica.getState() != ReplicaState.DECOMMISSION) {
+ replica.setState(ReplicaState.DECOMMISSION);
+ // set priority to normal because it may wait for a long time.
+ // Remain it as VERY_HIGH may block other task.
+ tabletCtx.setPriority(Priority.NORMAL);
+ LOG.info("set replica {} on backend {} of tablet {} state to
DECOMMISSION due to reason {}",
+ replica.getId(), replica.getBackendId(),
tabletCtx.getTabletId(), reason);
+ }
+
+ long preWatermarkTxnId = replica.getPreWatermarkTxnId();
+ if (preWatermarkTxnId == -1) {
+ preWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
+ .getTransactionIDGenerator().getNextTransactionId();
+ replica.setPreWatermarkTxnId(preWatermarkTxnId);
+ }
+
+ long postWatermarkTxnId = replica.getPostWatermarkTxnId();
+ if (postWatermarkTxnId == -1) {
+ try {
+ if
(!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(preWatermarkTxnId,
+ tabletCtx.getDbId(), tabletCtx.getTblId(),
tabletCtx.getPartitionId())) {
+ throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_DECOMMISSION,
+ "wait txn before pre watermark txn " +
preWatermarkTxnId + " to be finished");
+ }
+ } catch (AnalysisException e) {
+ throw new SchedException(Status.UNRECOVERABLE,
e.getMessage());
+ }
+ postWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
+ .getTransactionIDGenerator().getNextTransactionId();
+ replica.setPostWatermarkTxnId(postWatermarkTxnId);
+ }
+
try {
- if
(!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
- tabletCtx.getDbId(),
Lists.newArrayList(tabletCtx.getTblId()))) {
+ if
(!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(postWatermarkTxnId,
+ tabletCtx.getDbId(), tabletCtx.getTblId(),
tabletCtx.getPartitionId())) {
throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_DECOMMISSION,
- "wait txn before " + watermarkTxnId + " to be
finished");
+ "wait txn before post watermark txn " +
postWatermarkTxnId + " to be finished");
}
} catch (AnalysisException e) {
throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9114186162..293b49368a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1709,6 +1709,35 @@ public class DatabaseTransactionMgr {
return true;
}
+ public boolean isPreviousTransactionsFinished(long endTransactionId, long
tableId, long partitionId) {
+ readLock();
+ try {
+ for (Map.Entry<Long, TransactionState> entry :
idToRunningTransactionState.entrySet()) {
+ TransactionState transactionState = entry.getValue();
+ if (entry.getKey() > endTransactionId
+ ||
transactionState.getTransactionStatus().isFinalStatus()
+ || transactionState.getDbId() != dbId
+ ||
!transactionState.getTableIdList().contains(tableId)) {
+ continue;
+ }
+
+ if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
+ TableCommitInfo tableCommitInfo =
transactionState.getTableCommitInfo(tableId);
+ // txn not contains this partition
+ if (tableCommitInfo != null
+ &&
tableCommitInfo.getIdToPartitionCommitInfo().get(partitionId) == null) {
+ continue;
+ }
+ }
+
+ return false;
+ }
+ } finally {
+ readUnlock();
+ }
+ return true;
+ }
+
/**
* check if there exists a intersection between the source tableId list
and target tableId list
* if one of them is null or empty, that means that we don't know related
tables in tableList,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 25b3f30259..d08b1a4981 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -425,6 +425,19 @@ public class GlobalTransactionMgr implements Writable {
}
}
+ /**
+ * Check whether a load job for a partition already exists before
+ * checking all `TransactionId` related with this load job have finished.
+ * finished
+ *
+ * @throws AnalysisException is database does not exist anymore
+ */
+ public boolean isPreviousTransactionsFinished(long endTransactionId, long
dbId, long tableId,
+ long partitionId) throws AnalysisException {
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactionMgr(dbId);
+ return
dbTransactionMgr.isPreviousTransactionsFinished(endTransactionId, tableId,
partitionId);
+ }
+
/**
* The txn cleaner will run at a fixed interval and try to delete expired
and timeout txns:
* expired: txn is in VISIBLE or ABORTED, and is expired.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 10ce3542f6..b15329e6b0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -523,10 +523,6 @@ public class TransactionState implements Writable {
return this.idToTableCommitInfos.get(tableId);
}
- public void removeTable(long tableId) {
- this.idToTableCommitInfos.remove(tableId);
- }
-
public void setTxnCommitAttachment(TxnCommitAttachment
txnCommitAttachment) {
this.txnCommitAttachment = txnCommitAttachment;
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
index 05929ff8ce..c36ef531c2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
@@ -307,8 +307,8 @@ public class RebalanceTest {
Replica decommissionedReplica = replicas.stream()
.filter(r -> r.getState() ==
Replica.ReplicaState.DECOMMISSION)
.collect(MoreCollectors.onlyElement());
- // expected watermarkTxnId is 111
- Assert.assertEquals(111,
decommissionedReplica.getWatermarkTxnId());
+ Assert.assertEquals(111,
decommissionedReplica.getPreWatermarkTxnId());
+ Assert.assertEquals(112,
decommissionedReplica.getPostWatermarkTxnId());
});
// Delete replica should change invertedIndex too
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]