morningman closed pull request #350: Optimize the publish logic of streaming 
load
URL: https://github.com/apache/incubator-doris/pull/350
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index c608de08..a2902daa 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -330,6 +330,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
         }
         auto str = ctx->to_json();
         HttpChannel::send_reply(req, str);
+        k_streaming_load_current_processing.increment(-1);
         return -1;
     }
     return 0;
diff --git a/be/test/olap/file_helper_test.cpp 
b/be/test/olap/file_helper_test.cpp
index ed21e001..90d8e46a 100644
--- a/be/test/olap/file_helper_test.cpp
+++ b/be/test/olap/file_helper_test.cpp
@@ -91,10 +91,10 @@ TEST_F(FileHandlerTest, TestWrite) {
     ASSERT_EQ(22, length);
 
     
-    char* large_bytes2[(1 << 12)];
+    char* large_bytes2[(1 << 10)];
     memset(large_bytes2, 0, sizeof(char)*((1 << 12)));
     int i = 1;
-    while (i < 1 << 20) {
+    while (i < 1 << 17) {
         file_handler.write(large_bytes2, ((1 << 12)));
         ++i;
     }
diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/src/main/java/org/apache/doris/catalog/Replica.java
index f6046057..a51f1995 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java
@@ -20,6 +20,7 @@
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -178,6 +179,25 @@ public synchronized void updateVersionInfo(long 
newVersion, long newVersionHash,
                 lastSuccessVersion, lastSuccessVersionHash, dataSize, 
rowCount);
     }
     
+    /* last failed version:  LFV
+     * last success version: LSV
+     * version:              V
+     * 
+     * Case 1:
+     *      If LFV > LSV, set LSV back to V, which indicates that version 
between LSV and LFV is invalid.
+     *      Clone task will clone the version between LSV and LFV
+     *      
+     * Case 2:
+     *      LFV changed, set LSV back to V. This is just same as Case 1. Cause 
LFV must large than LSV.
+     * 
+     * Case 3:
+     *      LFV remains unchanged, just update LSV, and then check if it falls 
into Case 1.
+     *      
+     * Case 4:
+     *      V is larger or equal to LFV, reset LFV. And if V is less than LSV, 
just set V to LSV. This may
+     *      happen when a clone task finished and report version V, but the 
LSV is already larger than V,
+     *      And we know that version between V and LSV is valid, so move V 
forward to LSV.
+     */
     private void updateReplicaInfo(long newVersion, long newVersionHash, 
             long lastFailedVersion, long lastFailedVersionHash, 
             long lastSuccessVersion, long lastSuccessVersionHash, 
@@ -196,11 +216,14 @@ private void updateReplicaInfo(long newVersion, long 
newVersionHash,
             lastSuccessVersion = this.version;
             lastSuccessVersionHash = this.versionHash;
         }
+
+        // case 1:
         if (this.lastSuccessVersion <= this.lastFailedVersion) {
             this.lastSuccessVersion = this.version;
             this.lastSuccessVersionHash = this.versionHash;
         }
         
+        // TODO: this case is unknown, add log to observe
         if (this.version > lastFailedVersion && lastFailedVersion > 0) {
             LOG.info("current version {} is larger than last failed version {} 
, " 
                         + "last failed version hash {}, maybe a fatal error or 
be report version, print a stack here ", 
@@ -209,15 +232,17 @@ private void updateReplicaInfo(long newVersion, long 
newVersionHash,
         
         if (lastFailedVersion != this.lastFailedVersion
                 || this.lastFailedVersionHash != lastFailedVersionHash) {
-            // if last failed version changed, then set last success version 
to invalid version
+            // Case 2:
             if (lastFailedVersion > this.lastFailedVersion) {
                 this.lastFailedVersion = lastFailedVersion;
                 this.lastFailedVersionHash = lastFailedVersionHash;
                 this.lastFailedTimestamp = System.currentTimeMillis();
             }
+
             this.lastSuccessVersion = this.version;
             this.lastSuccessVersionHash = this.versionHash;
         } else {
+            // Case 3:
             if (lastSuccessVersion >= this.lastSuccessVersion) {
                 this.lastSuccessVersion = lastSuccessVersion;
                 this.lastSuccessVersionHash = lastSuccessVersionHash;
@@ -228,9 +253,7 @@ private void updateReplicaInfo(long newVersion, long 
newVersionHash,
             }
         }
         
-        // if last failed version <= version, then last failed version is 
invalid
-        // version xxxx | last failed version  xxxx | last success version xxx
-        // if current version == last failed version and version hash != last 
failed version hash, it means the version report from be is not valid
+        // Case 4:
         if (this.version > this.lastFailedVersion 
                 || this.version == this.lastFailedVersion && this.versionHash 
== this.lastFailedVersionHash
                 || this.version == this.lastFailedVersion && 
this.lastFailedVersionHash == 0 && this.versionHash != 0) {
@@ -242,7 +265,7 @@ private void updateReplicaInfo(long newVersion, long 
newVersionHash,
                 this.versionHash = this.lastSuccessVersionHash;
             }
         }
-        // TODO yiguolei use info log here, there maybe a lot of logs, change 
it to debug when concurrent load is stable
+
         LOG.debug("update {}", this.toString()); 
     }
     
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java 
b/fe/src/main/java/org/apache/doris/common/Config.java
index 75ac8579..fa13718a 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -241,6 +241,9 @@
      *      if (current_time - t1) > 300s, then palo will treat C as a failure 
node
      *      will call transaction manager to commit the transaction and tell 
transaction manager 
      *      that C is failed
+     * 
+     * This is also used when waiting for publish tasks
+     * 
      * TODO this parameter is the default value for all job and the DBA could 
specify it for separate job
      */
     @ConfField public static int load_straggler_wait_second = 300;
diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
index 3a9b7ac4..b96d4a03 100644
--- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -46,9 +46,9 @@
 import org.apache.doris.task.CloneTask;
 import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.CreateRollupTask;
-import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.task.DirMoveTask;
 import org.apache.doris.task.DownloadTask;
+import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.task.PushTask;
 import org.apache.doris.task.SchemaChangeTask;
 import org.apache.doris.task.SnapshotTask;
@@ -555,7 +555,7 @@ private void finishPublishVersion(AgentTask task, 
TFinishTaskRequest request) {
         if (request.isSetError_tablet_ids()) {
             errorTabletIds = request.getError_tablet_ids();
         }
-        PublishVersionTask publishVersionTask = (PublishVersionTask)task;
+        PublishVersionTask publishVersionTask = (PublishVersionTask) task;
         publishVersionTask.addErrorTablets(errorTabletIds);
         publishVersionTask.setIsFinished(true);
         AgentTaskQueue.removeTask(publishVersionTask.getBackendId(), 
diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
index 0b98e108..111c4df8 100644
--- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -17,9 +17,6 @@
 
 package org.apache.doris.metric;
 
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-
 import org.apache.doris.alter.Alter;
 import org.apache.doris.alter.AlterJob.JobType;
 import org.apache.doris.catalog.Catalog;
@@ -33,6 +30,10 @@
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -57,6 +58,8 @@
     public static LongCounterMetric COUNTER_EDIT_LOG_READ;
     public static LongCounterMetric COUNTER_IMAGE_WRITE;
     public static LongCounterMetric COUNTER_IMAGE_PUSH;
+    public static LongCounterMetric COUNTER_TXN_FAILED;
+    public static LongCounterMetric COUNTER_TXN_SUCCESS;
     public static Histogram HISTO_QUERY_LATENCY;
     public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
 
@@ -161,6 +164,12 @@ public Long getValue() {
         COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
                 "counter of image succeeded in pushing to other frontends");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
+        COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success",
+                "counter of success transactions");
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS);
+        COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed",
+                "counter of failed transactions");
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED);
 
         // 3. histogram
         HISTO_QUERY_LATENCY = 
METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms"));
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ffce942b..9c6d2668 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -634,7 +634,7 @@ public TLoadTxnCommitResult 
loadTxnCommit(TLoadTxnCommitRequest request) throws
         return result;
     }
 
-    // return true if commit success and publish success, return false if 
publish timout
+    // return true if commit success and publish success, return false if 
publish timeout
     private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws 
UserException {
         String cluster = request.getCluster();
         if (Strings.isNullOrEmpty(cluster)) {
@@ -655,6 +655,7 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest 
request) throws UserExce
             }
             throw new UserException("unknown database, database=" + dbName);
         }
+
         return 
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
                 db, request.getTxnId(),
                 TabletCommitInfo.fromThrift(request.getCommitInfos()),
diff --git a/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java 
b/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 97e31a6d..c65cfe05 100644
--- a/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -17,15 +17,15 @@
 
 package org.apache.doris.task;
 
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.doris.thrift.TPartitionVersionInfo;
+import org.apache.doris.thrift.TPublishVersionRequest;
+import org.apache.doris.thrift.TTaskType;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import org.apache.doris.thrift.TPartitionVersionInfo;
-import org.apache.doris.thrift.TPublishVersionRequest;
-import org.apache.doris.thrift.TTaskType;
+import java.util.ArrayList;
+import java.util.List;
 
 public class PublishVersionTask extends AgentTask {
     private static final Logger LOG = 
LogManager.getLogger(PublishVersionTask.class);
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java 
b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index d3da6a0b..35bd20c8 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -17,12 +17,6 @@
 
 package org.apache.doris.transaction;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 import org.apache.doris.alter.RollupJob;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
@@ -49,6 +43,13 @@
 import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -82,6 +83,7 @@
 
     // transactionId -> TransactionState
     private Map<Long, TransactionState> idToTransactionState;
+    // db id -> (label -> txn id)
     private com.google.common.collect.Table<Long, String, Long> 
dbIdToTxnLabels; 
     private Map<Long, Integer> runningTxnNums;
     private TransactionIdGenerator idGenerator;
@@ -107,7 +109,7 @@ public long beginTransaction(long dbId, String label, 
String coordinator, LoadJo
             throws AnalysisException, LabelAlreadyExistsException, 
BeginTransactionException {
 
         if (Config.disable_load_job) {
-            throw new BeginTransactionException("disable_load_job is set to 
true, all load job is prevented");
+            throw new BeginTransactionException("disable_load_job is set to 
true, all load jobs are prevented");
         }
         
         writeLock();
@@ -185,12 +187,11 @@ public void deleteTransaction(long transactionId) {
      */
     public void commitTransaction(long dbId, long transactionId, 
List<TabletCommitInfo> tabletCommitInfos)
             throws MetaNotFoundException, TransactionCommitFailedException {
-
         if (Config.disable_load_job) {
-            throw new TransactionCommitFailedException("disable_load_job is 
set to true, all load job is prevented");
+            throw new TransactionCommitFailedException("disable_load_job is 
set to true, all load jobs are prevented");
         }
         
-        LOG.debug("try to commit transaction:[{}]", transactionId);
+        LOG.debug("try to commit transaction: {}", transactionId);
         if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
             throw new TransactionCommitFailedException("all partitions have no 
load data");
         }
@@ -260,7 +261,8 @@ public void commitTransaction(long dbId, long 
transactionId, List<TabletCommitIn
                 }
                 // the rolling up index should also be taken care
                 // if the rollup index failed during load, then set its last 
failed version
-                // if rollup task finished, it should compare version and last 
failed version, if version < last failed version, then the replica is failed
+                // if rollup task finished, it should compare version and last 
failed version,
+                // if version < last failed version, then the replica is failed
                 if (rollingUpIndex != null) {
                     allIndices.add(rollingUpIndex);
                 }
@@ -287,11 +289,12 @@ public void commitTransaction(long dbId, long 
transactionId, List<TabletCommitIn
                                 // ignore it but not log it
                                 // for example, a replica is in clone state
                                 if (replica.getLastFailedVersion() < 0) {
-                                    ++ successReplicaNum;
+                                    ++successReplicaNum;
                                 } else {
                                     // if this error replica is a base replica 
and it is under rollup
                                     // then remove the rollup task and rollup 
job will remove the rollup replica automatically
-                                    // should remove here, because the error 
replicas not contains this base replica, but it have errors in the past
+                                    // should remove here, because the error 
replicas not contains this base replica,
+                                    // but it has errors in the past
                                     if (index.getId() == baseIndex.getId() && 
rollupJob != null) {
                                         LOG.info("the base replica [{}] has 
error, remove the related rollup replica from rollupjob [{}]", 
                                                 replica, rollupJob);
@@ -340,12 +343,16 @@ public void commitTransaction(long dbId, long 
transactionId, List<TabletCommitIn
             }
             // 5. persistent transactionState
             unprotectUpsertTransactionState(transactionState);
+
+            // add publish version tasks. set task to null as a placeholder.
+            // tasks will be created when publishing version.
             for (long backendId : totalInvolvedBackends) {
                 transactionState.addPublishVersionTask(backendId, null);
             }
         } finally {
             writeUnlock();
         }
+
         // 6. update nextVersion because of the failure of persistent 
transaction resulting in error version
         updateCatalogAfterCommitted(transactionState, db);
         LOG.info("transaction:[{}] successfully committed", transactionState);
@@ -385,7 +392,6 @@ public boolean commitAndPublishTransaction(Database db, 
long transactionId,
     }
     
     public void abortTransaction(long transactionId, String reason) throws 
UserException {
-
         if (transactionId < 0) {
             LOG.info("transaction id is {}, less than 0, maybe this is an old 
type load job, ignore abort operation", transactionId);
             return;
@@ -897,12 +903,14 @@ private void updateCatalogAfterCommitted(TransactionState 
transactionState, Data
                     }
                 }
                 partition.setNextVersion(partition.getNextVersion() + 1);
-                // the partition's current version hash should be set from 
partition commit info
-                // for example, fe master's partition current version hash is 
123123, fe followers partition current version hash is 3333
-                // they are different, fe master changed, the follower is 
master now, but its current version hash is 333, if clone happened,
-                // clone finished but its finished version hash != partition's 
current version hash, then clone is failed
-                // because clone depend on partition's current version to clone
-                partition.setNextVersionHash(Util.generateVersionHash(), 
partitionCommitInfo.getVersionHash());
+                // Although committed version(hash) is not visible to user,
+                // but they need to be synchronized among Frontends.
+                // because we use committed version(hash) to create clone 
task, if the first Master FE
+                // send clone task with committed version hash X, and than 
Master changed, the new Master FE
+                // received the clone task report with version hash X, which 
not equals to it own committed
+                // version hash, than the clone task is failed.
+                partition.setNextVersionHash(Util.generateVersionHash() /* 
next version hash */,
+                                             
partitionCommitInfo.getVersionHash() /* committed version hash*/);
             }
         }
     }
@@ -1015,14 +1023,14 @@ private void updateDBRunningTxnNum(TransactionStatus 
preStatus, TransactionState
         if (preStatus == null 
                 && (curTxnState.getTransactionStatus() == 
TransactionStatus.PREPARE
                     || curTxnState.getTransactionStatus() == 
TransactionStatus.COMMITTED)) {
-            ++ dbRunningTxnNum;
+            ++dbRunningTxnNum;
             runningTxnNums.put(curTxnState.getDbId(), dbRunningTxnNum);
         } else if (preStatus != null
                 && (preStatus == TransactionStatus.PREPARE
                     || preStatus == TransactionStatus.COMMITTED)
                 && (curTxnState.getTransactionStatus() == 
TransactionStatus.VISIBLE
                     || curTxnState.getTransactionStatus() == 
TransactionStatus.ABORTED)) {
-            -- dbRunningTxnNum;
+            --dbRunningTxnNum;
             if (dbRunningTxnNum < 1) {
                 runningTxnNums.remove(curTxnState.getDbId());
             } else {
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java 
b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 324ff9d3..270504a7 100644
--- a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -17,11 +17,8 @@
 
 package org.apache.doris.transaction;
 
-import com.google.common.collect.Sets;
-
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.Daemon;
@@ -31,6 +28,10 @@
 import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TPartitionVersionInfo;
 import org.apache.doris.thrift.TTaskType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -38,14 +39,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class PublishVersionDaemon extends Daemon {
     
     private static final Logger LOG = 
LogManager.getLogger(PublishVersionDaemon.class);
     
     public PublishVersionDaemon() {
-        super("PUBLISH_VERSION");
-        setInterval(Config.publish_version_interval_millis);
+        super("PUBLISH_VERSION", Config.publish_version_interval_millis);
     }
     
     protected void runOneCycle() {
@@ -64,7 +65,7 @@ private void publishVersion() {
         }
         // TODO yiguolei: could publish transaction state according to 
multi-tenant cluster info
         // but should do more work. for example, if a table is migrate from 
one cluster to another cluster
-        // should pulish to two clusters. 
+        // should publish to two clusters.
         // attention here, we publish transaction state to all backends 
including dead backend, if not publish to dead backend
         // then transaction manager will treat it as success
         List<Long> allBackends = 
Catalog.getCurrentSystemInfo().getBackendIds(false);
@@ -97,13 +98,14 @@ private void publishVersion() {
                 }
             }
             Set<Long> publishBackends = 
transactionState.getPublishVersionTasks().keySet();
+            // public version tasks are not persisted in catalog, so 
publishBackends may be empty.
+            // so we have to try publish to all backends;
             if (publishBackends.isEmpty()) {
                 // could not just add to it, should new a new object, or the 
back map will destroyed
                 publishBackends = Sets.newHashSet();
-                // this is useful if fe master transfer to another master, 
because publish version task is not
-                // persistent to edit log, then it should publish to all 
backends
                 publishBackends.addAll(allBackends);
             }
+
             for (long backendId : publishBackends) {
                 PublishVersionTask task = new PublishVersionTask(backendId, 
                                                                  
transactionState.getTransactionId(), 
@@ -130,6 +132,7 @@ private void publishVersion() {
             }
             Map<Long, PublishVersionTask> transTasks = 
transactionState.getPublishVersionTasks();
             Set<Replica> transErrorReplicas = Sets.newHashSet();
+            List<PublishVersionTask> unfinishedTasks = Lists.newArrayList();
             for (PublishVersionTask publishVersionTask : transTasks.values()) {
                 if (publishVersionTask.isFinished()) {
                     // sometimes backend finish publish version task, but it 
maybe failed to change transactionid to version for some tablets
@@ -145,44 +148,48 @@ private void publishVersion() {
                         }
                     }
                 } else {
-                    // if task is not finished in time, then set all replica 
in the backend to error state
-                    List<TPartitionVersionInfo> versionInfos = 
publishVersionTask.getPartitionVersionInfos();
-                    Set<Long> errorPartitionIds = Sets.newHashSet();
-                    for (TPartitionVersionInfo versionInfo : versionInfos) {
-                        errorPartitionIds.add(versionInfo.getPartition_id());
-                    }
-                    if (errorPartitionIds.isEmpty()) {
-                        continue;
-                    }
-                    List<Long> tabletIds = 
tabletInvertedIndex.getTabletIdsByBackendId(publishVersionTask.getBackendId());
-                    for (long tabletId : tabletIds) {
-                        long partitionId = 
tabletInvertedIndex.getPartitionId(tabletId);
-                        if (errorPartitionIds.contains(partitionId)) {
-                            Replica replica = 
tabletInvertedIndex.getReplica(tabletId, publishVersionTask.getBackendId());
-                            transErrorReplicas.add(replica);
+                    unfinishedTasks.add(publishVersionTask);
+                }
+            }
+
+            boolean shouldFinishTxn = false;
+            if (!unfinishedTasks.isEmpty()) {
+                if (transactionState.isPublishTimeout()) {
+                    // transaction's publish is timeout, but there still has 
unfinished tasks.
+                    // we need to collect all error replicas, and try to 
finish this txn.
+                    for (PublishVersionTask unfinishedTask : unfinishedTasks) {
+                        // set all replica in the backend to error state
+                        List<TPartitionVersionInfo> versionInfos = 
unfinishedTask.getPartitionVersionInfos();
+                        Set<Long> errorPartitionIds = Sets.newHashSet();
+                        for (TPartitionVersionInfo versionInfo : versionInfos) 
{
+                            
errorPartitionIds.add(versionInfo.getPartition_id());
+                        }
+                        if (errorPartitionIds.isEmpty()) {
+                            continue;
+                        }
+
+                        // TODO(cmy): this is inefficient, but just keep it 
simple. will change it later.
+                        List<Long> tabletIds = 
tabletInvertedIndex.getTabletIdsByBackendId(unfinishedTask.getBackendId());
+                        for (long tabletId : tabletIds) {
+                            long partitionId = 
tabletInvertedIndex.getPartitionId(tabletId);
+                            if (errorPartitionIds.contains(partitionId)) {
+                                Replica replica = 
tabletInvertedIndex.getReplica(tabletId,
+                                                                               
  unfinishedTask.getBackendId());
+                                transErrorReplicas.add(replica);
+                            }
                         }
                     }
+
+                    shouldFinishTxn = true;
                 }
+                // transaction's publish is not timeout, waiting next round.
+            } else {
+                // all publish tasks are finished, try to finish this txn.
+                shouldFinishTxn = true;
             }
-            // the timeout value is related with backend num
-            long timeoutMillis = 
Math.min(Config.publish_version_timeout_second * transTasks.size() * 1000, 
10000);
-            // the minimal internal should be 3s
-            timeoutMillis = Math.max(timeoutMillis, 3000);
             
-            // should not wait clone replica or replica's that with last 
failed version > 0
-            // if wait for them, the publish process will be very slow
-            int normalReplicasNotRespond = 0;
-            Set<Long> allErrorReplicas = Sets.newHashSet();
-            for (Replica replica : transErrorReplicas) {
-                allErrorReplicas.add(replica.getId());
-                if (replica.getState() != ReplicaState.CLONE 
-                        && replica.getLastFailedVersion() < 1) {
-                    ++normalReplicasNotRespond;
-                }
-            }
-            if (normalReplicasNotRespond == 0 
-                    || System.currentTimeMillis() - 
transactionState.getPublishVersionTime() > timeoutMillis) {
-                LOG.debug("transTask num {}, error replica id num {}", 
transTasks.size(), transErrorReplicas.size());
+            if (shouldFinishTxn) {
+                Set<Long> allErrorReplicas = transErrorReplicas.stream().map(v 
-> v.getId()).collect(Collectors.toSet());
                 
globalTransactionMgr.finishTransaction(transactionState.getTransactionId(), 
allErrorReplicas);
                 if (transactionState.getTransactionStatus() != 
TransactionStatus.VISIBLE) {
                     // if finish transaction state failed, then update publish 
version time, should check 
@@ -192,11 +199,12 @@ private void publishVersion() {
                             transactionState, transErrorReplicas.size());
                 }
             }
+
             if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
                 for (PublishVersionTask task : 
transactionState.getPublishVersionTasks().values()) {
                     AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION, task.getSignature());
                 }
             }
-        }
+        } // end for readyTransactionStates
     }
 }
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
index f8763490..a5dda349 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -17,8 +17,10 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.task.PublishVersionTask;
 
 import com.google.common.collect.Maps;
@@ -253,6 +255,13 @@ public void setTransactionStatus(TransactionStatus 
transactionStatus) {
         this.transactionStatus = transactionStatus;
         if (transactionStatus == TransactionStatus.VISIBLE) {
             this.latch.countDown();
+            if (MetricRepo.isInit.get()) {
+                MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
+            }
+        } else if (transactionStatus == TransactionStatus.ABORTED) {
+            if (MetricRepo.isInit.get()) {
+                MetricRepo.COUNTER_TXN_FAILED.increase(1L);
+            }
         }
     }
     
@@ -321,4 +330,12 @@ public LoadJobSourceType getSourceType() {
     public Map<Long, PublishVersionTask> getPublishVersionTasks() {
         return publishVersionTasks;
     }
+
+    public boolean isPublishTimeout() {
+        // timeout is between 3 to Config.max_txn_publish_waiting_time_ms 
seconds.
+        long timeoutMillis = Math.min(Config.publish_version_timeout_second * 
publishVersionTasks.size() * 1000,
+                                      Config.load_straggler_wait_second * 
1000);
+        timeoutMillis = Math.max(timeoutMillis, 3000);
+        return System.currentTimeMillis() - publishVersionTime > timeoutMillis;
+    }
 }
diff --git 
a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
 
b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
index 8f6e22ec..f5f862bb 100644
--- 
a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
+++ 
b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -17,32 +17,26 @@
 
 package org.apache.doris.load.routineload;
 
-import com.google.common.collect.Lists;
-import mockit.Deencapsulation;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.persist.EditLog;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TResourceInfo;
-import org.easymock.EasyMock;
+
+import com.google.common.collect.Lists;
+
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
+import mockit.Deencapsulation;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
 public class RoutineLoadSchedulerTest {
 
     @Test
@@ -72,18 +66,6 @@ public void testNormalRunOneCycle(@Mocked Catalog catalog,
         Deencapsulation.setField(routineLoadJob, "kafkaPartitions", 
partitions);
         Deencapsulation.setField(routineLoadJob, "desireTaskConcurrentNum", 3);
 
-        new MockUp<Catalog>() {
-            @Mock
-            public SystemInfoService getCurrentSystemInfo() {
-                return systemInfoService;
-            }
-
-            @Mock
-            public Catalog getCurrentCatalog() {
-                return catalog;
-            }
-        };
-
         new Expectations() {
             {
                 catalog.getRoutineLoadInstance();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to