This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 08c9cba6394 [improvement](transaction) reduce publish txn fail log
#28277 (#35060)
08c9cba6394 is described below
commit 08c9cba6394b42b35bbe12c8a5fcab7eb02cb16d
Author: yujun <[email protected]>
AuthorDate: Tue Jun 11 14:46:03 2024 +0800
[improvement](transaction) reduce publish txn fail log #28277 (#35060)
cherry pick from #28277
---
.../main/java/org/apache/doris/common/Config.java | 4 +
.../doris/transaction/DatabaseTransactionMgr.java | 317 ++++++++++++---------
.../apache/doris/transaction/TransactionState.java | 19 ++
3 files changed, 202 insertions(+), 138 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c219a6f9656..4968231afd4 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -466,6 +466,10 @@ public class Config extends ConfigBase {
+ "then the load task will be successful." })
public static int publish_wait_time_second = 300;
+ @ConfField(mutable = true, masterOnly = true, description = {"单个事务 publish
失败打日志间隔",
+ "print log interval for publish transaction failed interval"})
+ public static long publish_fail_log_interval_second = 5 * 60;
+
@ConfField(mutable = true, masterOnly = true, description =
{"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction
to be committed, in seconds. "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index f5e77563545..36b32f01069 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -919,19 +919,8 @@ public class DatabaseTransactionMgr {
// add all commit errors and publish errors to a single set
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
- Map<Long, PublishVersionTask> publishTasks =
transactionState.getPublishVersionTasks();
-
- long now = System.currentTimeMillis();
- long firstPublishVersionTime =
transactionState.getFirstPublishVersionTime();
- boolean allowPublishOneSucc = false;
- if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
- && now >= firstPublishVersionTime +
Config.publish_wait_time_second * 1000L) {
- allowPublishOneSucc = true;
- }
- List<Replica> tabletSuccReplicas = Lists.newArrayList();
- List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
- List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+ List<Pair<OlapTable, Partition>> relatedTblPartitions =
Lists.newArrayList();
// case 1 If database is dropped, then we just throw
MetaNotFoundException, because all related tables are
// already force dropped, we just ignore the transaction with all
tables been force dropped.
@@ -946,133 +935,12 @@ public class DatabaseTransactionMgr {
LOG.debug("finish transaction {} with tables {}", transactionId,
tableIdList);
List<? extends TableIf> tableList =
db.getTablesOnIdOrderIfExist(tableIdList);
tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
- PublishResult publishResult = PublishResult.QUORUM_SUCC;
+ PublishResult publishResult;
try {
- Iterator<TableCommitInfo> tableCommitInfoIterator
- =
transactionState.getIdToTableCommitInfos().values().iterator();
- while (tableCommitInfoIterator.hasNext()) {
- TableCommitInfo tableCommitInfo =
tableCommitInfoIterator.next();
- long tableId = tableCommitInfo.getTableId();
- OlapTable table = (OlapTable) db.getTableNullable(tableId);
- // table maybe dropped between commit and publish, ignore this
error
- if (table == null) {
- tableCommitInfoIterator.remove();
- LOG.warn("table {} is dropped, skip version check and
remove it from transaction state {}",
- tableId,
- transactionState);
- continue;
- }
- PartitionInfo partitionInfo = table.getPartitionInfo();
- Iterator<PartitionCommitInfo> partitionCommitInfoIterator
- =
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
- while (partitionCommitInfoIterator.hasNext()) {
- PartitionCommitInfo partitionCommitInfo =
partitionCommitInfoIterator.next();
- long partitionId = partitionCommitInfo.getPartitionId();
- Partition partition = table.getPartition(partitionId);
- // partition maybe dropped between commit and publish
version, ignore this error
- if (partition == null) {
- partitionCommitInfoIterator.remove();
- LOG.warn("partition {} is dropped, skip version check"
- + " and remove it from transaction
state {}", partitionId, transactionState);
- continue;
- }
- if (partition.getVisibleVersion() !=
partitionCommitInfo.getVersion() - 1) {
- LOG.debug("transactionId {} partition commitInfo
version {} is not equal with "
- + "partition visible version {} plus
one, need wait",
- transactionId,
- partitionCommitInfo.getVersion(),
- partition.getVisibleVersion());
- String errMsg = String.format("wait for publishing
partition %d version %d."
- + " self version: %d. table %d",
partitionId, partition.getVisibleVersion() + 1,
- partitionCommitInfo.getVersion(), tableId);
- transactionState.setErrorMsg(errMsg);
- return;
- }
- int quorumReplicaNum =
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
-
- List<MaterializedIndex> allIndices;
- if (transactionState.getLoadedTblIndexes().isEmpty()) {
- allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
- } else {
- allIndices = Lists.newArrayList();
- for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
- MaterializedIndex index =
partition.getIndex(indexId);
- if (index != null) {
- allIndices.add(index);
- }
- }
- }
-
- // check success replica number for each tablet.
- // a success replica means:
- // 1. Not in errorReplicaIds: succeed in both commit and
publish phase
- // 2. last failed version < 0: is a health replica before
- // 3. version catch up: not with a stale version
- // Here we only check number, the replica version will be
updated in updateCatalogAfterVisible()
- for (MaterializedIndex index : allIndices) {
- for (Tablet tablet : index.getTablets()) {
- tabletSuccReplicas.clear();
- tabletWriteFailedReplicas.clear();
- tabletVersionFailedReplicas.clear();
- for (Replica replica : tablet.getReplicas()) {
-
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
- partitionCommitInfo.getVersion(),
publishTasks.get(replica.getBackendId()),
- errorReplicaIds, tabletSuccReplicas,
tabletWriteFailedReplicas,
- tabletVersionFailedReplicas);
- }
-
- int healthReplicaNum = tabletSuccReplicas.size();
- if (healthReplicaNum >= quorumReplicaNum) {
- if (!tabletWriteFailedReplicas.isEmpty() ||
!tabletVersionFailedReplicas.isEmpty()) {
- String writeDetail =
getTabletWriteDetail(tabletSuccReplicas,
- tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
- LOG.info("publish version quorum succ for
transaction {} on tablet {} with version"
- + " {}, and has failed replicas,
quorum num {}. table {}, partition {},"
- + " tablet detail: {}",
- transactionState, tablet.getId(),
partitionCommitInfo.getVersion(),
- quorumReplicaNum, tableId,
partitionId, writeDetail);
- }
- continue;
- }
-
- String writeDetail =
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
- tabletVersionFailedReplicas);
- if (allowPublishOneSucc && healthReplicaNum > 0) {
- if (publishResult ==
PublishResult.QUORUM_SUCC) {
- publishResult = PublishResult.TIMEOUT_SUCC;
- }
- // We can not do any thing except retrying,
- // because publish task is assigned a version,
- // and doris does not permit discontinuous
- // versions.
- //
- // If a timeout happens, it means that the
rowset
- // that are being publised exists on a few
replicas we should go
- // ahead, otherwise data may be lost and thre
- // publish task hangs forever.
- LOG.info("publish version timeout succ for
transaction {} on tablet {} with version"
- + " {}, and has failed replicas,
quorum num {}. table {}, partition {},"
- + " tablet detail: {}",
- transactionState, tablet.getId(),
partitionCommitInfo.getVersion(),
- quorumReplicaNum, tableId,
partitionId, writeDetail);
- } else {
- publishResult = PublishResult.FAILED;
- String errMsg = String.format("publish on
tablet %d failed."
- + " succeed replica num %d
less than quorum %d."
- + " table: %d, partition: %d,
publish version: %d",
- tablet.getId(), healthReplicaNum,
quorumReplicaNum, tableId,
- partitionId,
partition.getVisibleVersion() + 1);
- transactionState.setErrorMsg(errMsg);
- LOG.info("publish version failed for
transaction {} on tablet {} with version"
- + " {}, and has failed replicas,
quorum num {}. table {}, partition {},"
- + " tablet detail: {}",
- transactionState, tablet.getId(),
partitionCommitInfo.getVersion(),
- quorumReplicaNum, tableId,
partitionId, writeDetail);
- }
- }
- }
- }
+ if (!finishCheckPartitionVersion(transactionState, db,
relatedTblPartitions)) {
+ return;
}
+ publishResult = finishCheckQuorumReplicas(transactionState,
relatedTblPartitions, errorReplicaIds);
if (publishResult == PublishResult.FAILED) {
return;
}
@@ -1107,7 +975,180 @@ public class DatabaseTransactionMgr {
// Otherwise, there is no way for stream load to query the result
right after loading finished,
// even if we call "sync" before querying.
transactionState.countdownVisibleLatch();
- LOG.info("finish transaction {} successfully, publish result: {}",
transactionState, publishResult.name());
+ LOG.info("finish transaction {} successfully, publish times {},
publish result {}",
+ transactionState, transactionState.getPublishCount(),
publishResult.name());
+ }
+
+ private boolean finishCheckPartitionVersion(TransactionState
transactionState, Database db,
+ List<Pair<OlapTable, Partition>> relatedTblPartitions) {
+ Iterator<TableCommitInfo> tableCommitInfoIterator
+ =
transactionState.getIdToTableCommitInfos().values().iterator();
+ while (tableCommitInfoIterator.hasNext()) {
+ TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+ long tableId = tableCommitInfo.getTableId();
+ OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ // table maybe dropped between commit and publish, ignore this
error
+ if (table == null) {
+ tableCommitInfoIterator.remove();
+ LOG.warn("table {} is dropped, skip version check and remove
it from transaction state {}",
+ tableId,
+ transactionState);
+ continue;
+ }
+
+ Iterator<PartitionCommitInfo> partitionCommitInfoIterator
+ =
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+ while (partitionCommitInfoIterator.hasNext()) {
+ PartitionCommitInfo partitionCommitInfo =
partitionCommitInfoIterator.next();
+ long partitionId = partitionCommitInfo.getPartitionId();
+ Partition partition = table.getPartition(partitionId);
+ // partition maybe dropped between commit and publish version,
ignore this error
+ if (partition == null) {
+ partitionCommitInfoIterator.remove();
+ LOG.warn("partition {} is dropped, skip version check"
+ + " and remove it from transaction state
{}", partitionId, transactionState);
+ continue;
+ }
+ if (partition.getVisibleVersion() !=
partitionCommitInfo.getVersion() - 1) {
+ LOG.debug("for table {} partition {}, transactionId {}
partition commitInfo version {} is not"
+ + " equal with partition visible version {} plus
one, need wait",
+ table.getId(), partition.getId(),
transactionState.getTransactionId(),
+ partitionCommitInfo.getVersion(),
partition.getVisibleVersion());
+ String errMsg = String.format("wait for publishing
partition %d version %d."
+ + " self version: %d. table %d",
partitionId, partition.getVisibleVersion() + 1,
+ partitionCommitInfo.getVersion(), tableId);
+ transactionState.setErrorMsg(errMsg);
+ return false;
+ }
+
+ relatedTblPartitions.add(Pair.of(table, partition));
+ }
+ }
+
+ return true;
+ }
+
+ private PublishResult finishCheckQuorumReplicas(TransactionState
transactionState,
+ List<Pair<OlapTable, Partition>> relatedTblPartitions,
+ Set<Long> errorReplicaIds) {
+ long now = System.currentTimeMillis();
+ long firstPublishVersionTime =
transactionState.getFirstPublishVersionTime();
+ boolean allowPublishOneSucc = false;
+ if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
+ && now >= firstPublishVersionTime +
Config.publish_wait_time_second * 1000L) {
+ allowPublishOneSucc = true;
+ }
+
+ List<Replica> tabletSuccReplicas = Lists.newArrayList();
+ List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+ List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+ List<String> logs = Lists.newArrayList();
+
+ Map<Long, PublishVersionTask> publishTasks =
transactionState.getPublishVersionTasks();
+ PublishResult publishResult = PublishResult.QUORUM_SUCC;
+ for (Pair<OlapTable, Partition> pair : relatedTblPartitions) {
+ OlapTable table = pair.key();
+ Partition partition = pair.value();
+ long tableId = table.getId();
+ long partitionId = partition.getId();
+ long newVersion = partition.getVisibleVersion() + 1;
+ int loadRequiredReplicaNum = table.getPartitionInfo()
+ .getReplicaAllocation(partitionId).getTotalReplicaNum() /
2 + 1;
+ List<MaterializedIndex> allIndices;
+ if (transactionState.getLoadedTblIndexes().isEmpty()) {
+ allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ } else {
+ allIndices = Lists.newArrayList();
+ for (long indexId :
transactionState.getLoadedTblIndexes().get(tableId)) {
+ MaterializedIndex index = partition.getIndex(indexId);
+ if (index != null) {
+ allIndices.add(index);
+ }
+ }
+ }
+
+ // check success replica number for each tablet.
+ // a success replica means:
+ // 1. Not in errorReplicaIds: succeed in both commit and publish
phase
+ // 2. last failed version < 0: is a health replica before
+ // 3. version catch up: not with a stale version
+ // Here we only check number, the replica version will be updated
in updateCatalogAfterVisible()
+ for (MaterializedIndex index : allIndices) {
+ for (Tablet tablet : index.getTablets()) {
+ tabletSuccReplicas.clear();
+ tabletWriteFailedReplicas.clear();
+ tabletVersionFailedReplicas.clear();
+ for (Replica replica : tablet.getReplicas()) {
+ checkReplicaContinuousVersionSucc(tablet.getId(),
replica,
+ newVersion,
publishTasks.get(replica.getBackendId()),
+ errorReplicaIds, tabletSuccReplicas,
tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas);
+ }
+
+ int healthReplicaNum = tabletSuccReplicas.size();
+ if (healthReplicaNum >= loadRequiredReplicaNum) {
+ boolean hasFailedReplica =
!tabletWriteFailedReplicas.isEmpty()
+ || !tabletVersionFailedReplicas.isEmpty();
+ if (hasFailedReplica) {
+ String writeDetail =
getTabletWriteDetail(tabletSuccReplicas,
+ tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
+ logs.add(String.format("publish version quorum
succ for transaction %s on tablet %s"
+ + " with version %s, and has failed
replicas, load require replica num %s. "
+ + "table %s, partition %s, tablet detail:
%s",
+ transactionState, tablet.getId(),
newVersion,
+ loadRequiredReplicaNum, tableId,
partitionId, writeDetail));
+ }
+ continue;
+ }
+
+ String writeDetail =
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
+ tabletVersionFailedReplicas);
+ if (allowPublishOneSucc && healthReplicaNum > 0) {
+ if (publishResult == PublishResult.QUORUM_SUCC) {
+ publishResult = PublishResult.TIMEOUT_SUCC;
+ }
+ // We can not do any thing except retrying,
+ // because publish task is assigned a version,
+ // and doris does not permit discontinuous
+ // versions.
+ //
+ // If a timeout happens, it means that the rowset
+ // that are being publised exists on a few replicas we
should go
+ // ahead, otherwise data may be lost and thre
+ // publish task hangs forever.
+ logs.add(String.format("publish version timeout succ
for transaction %s on tablet %s "
+ + "with version %s, and has failed replicas,
load require replica num %s. "
+ + "table %s, partition %s, tablet detail: %s",
+ transactionState, tablet.getId(), newVersion,
+ loadRequiredReplicaNum, tableId, partitionId,
writeDetail));
+ } else {
+ publishResult = PublishResult.FAILED;
+ String errMsg = String.format("publish on tablet %d
failed."
+ + " succeed replica num %d < load
required replica num %d."
+ + " table: %d, partition: %d, publish
version: %d",
+ tablet.getId(), healthReplicaNum,
loadRequiredReplicaNum, tableId,
+ partitionId, newVersion);
+ transactionState.setErrorMsg(errMsg);
+ logs.add(String.format("publish version failed for
transaction %s on tablet %s with version"
+ + " %s, and has failed replicas, load required
replica num %s. table %s, "
+ + "partition %s, tablet detail: %s",
+ transactionState, tablet.getId(), newVersion,
+ loadRequiredReplicaNum, tableId, partitionId,
writeDetail));
+ }
+ }
+ }
+ }
+
+ boolean needLog = publishResult != PublishResult.FAILED
+ || now - transactionState.getLastPublishLogTime() >
Config.publish_fail_log_interval_second * 1000L;
+ if (needLog) {
+ transactionState.setLastPublishLogTime(now);
+ for (String log : logs) {
+ LOG.info("{}. publish times {}", log,
transactionState.getPublishCount());
+ }
+ }
+
+ return publishResult;
}
private void checkReplicaContinuousVersionSucc(long tabletId, Replica
replica, long version,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 5eed8c655c9..f9a094eceb9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -230,6 +230,12 @@ public class TransactionState implements Writable {
private long lastPublishVersionTime = -1;
+ private long publishCount = 0;
+
+ // txn may try finish many times and generate a lot of log.
+ // use lastPublishLogTime to reduce log.
+ private long lastPublishLogTime = 0;
+
@SerializedName(value = "callbackId")
private long callbackId = -1;
@@ -347,6 +353,7 @@ public class TransactionState implements Writable {
}
public void updateSendTaskTime() {
+ this.publishCount++;
this.lastPublishVersionTime = System.currentTimeMillis();
if (this.firstPublishVersionTime <= 0) {
this.firstPublishVersionTime = lastPublishVersionTime;
@@ -361,6 +368,10 @@ public class TransactionState implements Writable {
return this.lastPublishVersionTime;
}
+ public long getPublishCount() {
+ return publishCount;
+ }
+
public boolean hasSendTask() {
return this.hasSendTask;
}
@@ -429,6 +440,14 @@ public class TransactionState implements Writable {
return errorLogUrl;
}
+ public long getLastPublishLogTime() {
+ return lastPublishLogTime;
+ }
+
+ public void setLastPublishLogTime(long lastPublishLogTime) {
+ this.lastPublishLogTime = lastPublishLogTime;
+ }
+
public void setTransactionStatus(TransactionStatus transactionStatus) {
// status changed
this.preStatus = this.transactionStatus;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]