This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 46181c0  Fix some bugs about load label (#2241)
46181c0 is described below

commit 46181c08800942c26123745688255c70490aa3f9
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Nov 23 00:04:45 2019 +0800

    Fix some bugs about load label (#2241)
---
 .../sql-statements/Data Manipulation/insert.md     |  10 +-
 .../sql-statements/Data Manipulation/insert_EN.md  |  10 +-
 .../java/org/apache/doris/catalog/Catalog.java     |   6 +-
 .../doris/common/DuplicatedRequestException.java   |  15 ++-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  32 +++--
 .../org/apache/doris/load/loadv2/JobState.java     |  10 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |  94 +++++++++-----
 .../apache/doris/load/loadv2/LoadJobScheduler.java |  11 +-
 .../org/apache/doris/load/loadv2/LoadManager.java  |  70 +++++++---
 .../doris/load/loadv2/LoadTimeoutChecker.java      |   6 +-
 .../org/apache/doris/load/loadv2/MiniLoadJob.java  |   7 +-
 .../load/routineload/RoutineLoadTaskInfo.java      |   5 +
 .../java/org/apache/doris/persist/EditLog.java     |   1 -
 .../apache/doris/service/FrontendServiceImpl.java  |   5 +
 .../doris/transaction/GlobalTransactionMgr.java    | 142 +++++++++++++++------
 .../apache/doris/transaction/TransactionState.java |   4 +-
 .../doris/transaction/TxnStateCallbackFactory.java |   5 +-
 .../org/apache/doris/http/DorisHttpTestCase.java   |   7 +-
 .../org/apache/doris/load/loadv2/LoadJobTest.java  |   3 +-
 .../transaction/GlobalTransactionMgrTest.java      |   7 +-
 20 files changed, 308 insertions(+), 142 deletions(-)

diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/insert.md 
b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/insert.md
index 6483c20..98588a6 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/insert.md    
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data 
Manipulation/insert.md    
@@ -23,8 +23,8 @@ under the License.
 
 ```
 INSERT INTO table_name
+    [ PARTITION (p1, ...) ]
     [ WITH LABEL label]
-    [ PARTITION (, ...) ]
     [ (column [, ...]) ]
     [ [ hint [, ...] ] ]
     { VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
@@ -34,9 +34,9 @@ INSERT INTO table_name
 
 > tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式
 >
-> label: 为 Insert 任务指定一个 label
+> partitions: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔
 >
-> partition_names: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔
+> label: 为 Insert 任务指定一个 label
 >
 > column_name: 指定的目的列,必须是 `table_name` 中存在的列
 >
@@ -88,10 +88,10 @@ INSERT INTO test SELECT * FROM test2;
 INSERT INTO test (c1, c2) SELECT * from test2;
 ```
 
-4. 向 `test` 表中导入一个查询语句结果,并指定 label
+4. 向 `test` 表中导入一个查询语句结果,并指定 partition 和 label
 
 ```
-INSERT INTO test WITH LABEL `label1` SELECT * FROM test2;
+INSERT INTO test PARTITION(p1, p2) WITH LABEL `label1` SELECT * FROM test2;
 INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2;
 ```
 
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/insert_EN.md 
b/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/insert_EN.md
index 128060f..8226449 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/insert_EN.md 
+++ b/docs/documentation/en/sql-reference/sql-statements/Data 
Manipulation/insert_EN.md 
@@ -23,8 +23,8 @@ under the License.
 
 ```
 INSERT INTO table_name
+[ PARTITION (p1, ...)]
 [ WITH LABEL label]
-[ PARTICIPATION [...]
 [ (column [, ...]) ]
 [ [ hint [, ...] ] ]
 { VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
@@ -34,9 +34,9 @@ INSERT INTO table_name
 
 > tablet_name: Target table for loading data. It can be in the form of 
 > `db_name.table_name`.
 >
-> label: Specifies a label for Insert job.
+> partitions: Specifies the partitions to be loaded, with multiple partition 
names separated by commas. The partitions must exist in `table_name`, 
 >
-> partition_names: Specifies the partitions to be loaded, with multiple 
partition names separated by commas. The partitions must exist in `table_name`, 
+> label: Specifies a label for Insert job.
 >
 > column_name: The specified destination columns must be columns that exists 
 > in `table_name`.
 >
@@ -89,10 +89,10 @@ INSERT INTO test SELECT * FROM test2
 INSERT INTO test (c1, c2) SELECT * from test2
 ```
 
-4. Insert into table `test` with specified label
+4. Insert into table `test` with specified partition and label
 
 ```
-INSERT INTO test WITH LABEL `label1` SELECT * FROM test2;
+INSERT INTO test PARTITION(p1, p2) WITH LABEL `label1` SELECT * FROM test2;
 INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2;
 ```
 
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index af506f7..5926411 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -1338,14 +1338,13 @@ public class Catalog {
             checksum = loadExportJob(dis, checksum);
             checksum = loadBackupHandler(dis, checksum);
             checksum = loadPaloAuth(dis, checksum);
+            // global transaction must be replayed before load jobs v2
             checksum = loadTransactionState(dis, checksum);
             checksum = loadColocateTableIndex(dis, checksum);
             checksum = loadRoutineLoadJobs(dis, checksum);
             checksum = loadLoadJobsV2(dis, checksum);
             checksum = loadSmallFiles(dis, checksum);
 
-
-
             long remoteChecksum = dis.readLong();
             Preconditions.checkState(remoteChecksum == checksum, 
remoteChecksum + " vs. " + checksum);
         } finally {
@@ -1720,7 +1719,8 @@ public class Catalog {
 
     public long loadLoadJobsV2(DataInputStream in, long checksum) throws 
IOException {
         if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_50) {
-            Catalog.getCurrentCatalog().getLoadManager().readFields(in);
+            loadManager.readFields(in);
+            loadManager.transferLoadingStateToCommitted(globalTransactionMgr);
         }
         return checksum;
     }
diff --git 
a/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java 
b/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java
index e6aa8d3..f3de2f3 100644
--- a/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java
+++ b/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java
@@ -17,13 +17,26 @@
 
 package org.apache.doris.common;
 
+/*
+ * This exception throws when the request from Backend is duplicated.
+ * It is currently used for mini load and stream load's begin txn requests.
+ * Because the request may be a retry request, so that we should throw this 
exception
+ * and return the 'already-begun' txn id.
+ */
 public class DuplicatedRequestException extends DdlException {
 
     private String duplicatedRequestId;
+    // save exist txn id
+    private long txnId;
 
-    public DuplicatedRequestException(String duplicatedRequestId, String msg) {
+    public DuplicatedRequestException(String duplicatedRequestId, long txnId, 
String msg) {
         super(msg);
         this.duplicatedRequestId = duplicatedRequestId;
+        this.txnId = txnId;
+    }
+
+    public long getTxnId() {
+        return txnId;
     }
 
     public String getDuplicatedRequestId() {
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index df2d68c..6945a44 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -201,7 +202,8 @@ public class BrokerLoadJob extends LoadJob {
     }
 
     @Override
-    public void beginTxn() throws LabelAlreadyUsedException, 
BeginTransactionException, AnalysisException {
+    public void beginTxn()
+            throws LabelAlreadyUsedException, BeginTransactionException, 
AnalysisException, DuplicatedRequestException {
         transactionId = Catalog.getCurrentGlobalTransactionMgr()
                 .beginTransaction(dbId, label, null, "FE: " + 
FrontendOptions.getLocalHostAddress(),
                                   
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
@@ -237,10 +239,10 @@ public class BrokerLoadJob extends LoadJob {
         writeLock();
         try {
             // check if job has been completed
-            if (isCompleted()) {
+            if (isTxnDone()) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
                                  .add("state", state)
-                                 .add("error_msg", "this task will be ignored 
when job is completed")
+                                 .add("error_msg", "this task will be ignored 
when job is: " + state)
                                  .build());
                 return;
             }
@@ -250,6 +252,8 @@ public class BrokerLoadJob extends LoadJob {
             }
             if (loadTask.getRetryTime() <= 0) {
                 unprotectedExecuteCancel(failMsg, true);
+                logFinalOperation();
+                return;
             } else {
                 // retry task
                 idToTasks.remove(loadTask.getSignature());
@@ -265,8 +269,6 @@ public class BrokerLoadJob extends LoadJob {
         } finally {
             writeUnlock();
         }
-
-        logFinalOperation();
     }
 
     /**
@@ -298,7 +300,7 @@ public class BrokerLoadJob extends LoadJob {
                              .add("msg", "The failure happens in analyze, the 
load job will be cancelled with error:"
                                      + e.getMessage())
                              .build(), e);
-            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false);
+            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true);
         }
     }
 
@@ -312,13 +314,14 @@ public class BrokerLoadJob extends LoadJob {
         writeLock();
         try {
             // check if job has been cancelled
-            if (isCompleted()) {
+            if (isTxnDone()) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
                                  .add("state", state)
-                                 .add("error_msg", "this task will be ignored 
when job is completed")
+                                 .add("error_msg", "this task will be ignored 
when job is: " + state)
                                  .build());
                 return;
             }
+
             if (finishedTaskIds.contains(attachment.getTaskId())) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
                                  .add("task_id", attachment.getTaskId())
@@ -342,7 +345,7 @@ public class BrokerLoadJob extends LoadJob {
                              .add("database_id", dbId)
                              .add("error_msg", "Failed to divide job into 
loading task.")
                              .build(), e);
-            cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, 
e.getMessage()), true);
+            cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, 
e.getMessage()), true, true);
             return;
         }
 
@@ -401,10 +404,10 @@ public class BrokerLoadJob extends LoadJob {
         writeLock();
         try {
             // check if job has been cancelled
-            if (isCompleted()) {
+            if (isTxnDone()) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
                                  .add("state", state)
-                                 .add("error_msg", "this task will be ignored 
when job is completed")
+                                 .add("error_msg", "this task will be ignored 
when job is: " + state)
                                  .build());
                 return;
             }
@@ -437,7 +440,8 @@ public class BrokerLoadJob extends LoadJob {
 
         // check data quality
         if (!checkDataQuality()) {
-            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),true);
+            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
+                    true, true);
             return;
         }
         Database db = null;
@@ -448,7 +452,7 @@ public class BrokerLoadJob extends LoadJob {
                              .add("database_id", dbId)
                              .add("error_msg", "db has been deleted when job 
is loading")
                              .build(), e);
-            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true);
+            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
         }
         db.writeLock();
         try {
@@ -465,7 +469,7 @@ public class BrokerLoadJob extends LoadJob {
                              .add("database_id", dbId)
                              .add("error_msg", "Failed to commit txn with 
error:" + e.getMessage())
                              .build(), e);
-            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()),true);
+            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
             return;
         } finally {
             db.writeUnlock();
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
index 6370e0a..7779921 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
@@ -17,9 +17,11 @@
 
 package org.apache.doris.load.loadv2;
 
+// JobState will be persisted in meta data by name, so the order of these 
state is not important
 public enum JobState {
-    PENDING,
-    LOADING,
-    FINISHED,
-    CANCELLED
+    PENDING, // init state
+    LOADING, // job is running
+    COMMITTED, // transaction is committed but not visible
+    FINISHED, // transaction is visible and job is finished
+    CANCELLED // transaction is aborted and job is cancelled
 }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 408d191..93d25a1 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeMetaVersion;
@@ -118,12 +119,6 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
     // This param is set true during txn is committing.
     // During committing, the load job could not be cancelled.
     protected boolean isCommitting = false;
-    // This param is set true in mini load.
-    // The streaming mini load could not be cancelled by frontend.
-    protected boolean isCancellable = true;
-
-    // only for persistence param
-    private boolean isJobTypeRead = false;
 
     protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
@@ -132,6 +127,9 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
 
     protected LoadStatistic loadStatistic = new LoadStatistic();
 
+    // only for persistence param. see readFields() for usage
+    private boolean isJobTypeRead = false;
+
     public static class LoadStatistic {
         // number of rows processed on BE, this number will be updated 
periodically by query report.
         // A load job may has several load tasks(queries), and each task has 
several fragments.
@@ -240,12 +238,12 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         return createTimestamp;
     }
 
-    public long getDeadlineMs() {
+    protected long getDeadlineMs() {
         return createTimestamp + timeoutSecond * 1000;
     }
 
-    public long getLeftTimeMs() {
-        return getDeadlineMs() - System.currentTimeMillis();
+    private boolean isTimeout() {
+        return System.currentTimeMillis() > getDeadlineMs();
     }
 
     public long getFinishTimestamp() {
@@ -290,6 +288,12 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      */
     abstract Set<String> getTableNames() throws MetaNotFoundException;
 
+    // return true if the corresponding transaction is done(COMMITTED, 
FINISHED, CANCELLED)
+    public boolean isTxnDone() {
+        return state == JobState.COMMITTED || state == JobState.FINISHED || 
state == JobState.CANCELLED;
+    }
+
+    // return true if job is done(FINISHED/CANCELLED)
     public boolean isCompleted() {
         return state == JobState.FINISHED || state == JobState.CANCELLED;
     }
@@ -352,7 +356,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         isJobTypeRead = jobTypeRead;
     }
 
-    public void beginTxn() throws LabelAlreadyUsedException, 
BeginTransactionException, AnalysisException {
+    public void beginTxn() throws LabelAlreadyUsedException, 
BeginTransactionException, AnalysisException, DuplicatedRequestException {
     }
 
     /**
@@ -362,8 +366,9 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      * @throws LabelAlreadyUsedException the job is duplicated
      * @throws BeginTransactionException the limit of load job is exceeded
      * @throws AnalysisException there are error params in job
+     * @throws DuplicatedRequestException 
      */
-    public void execute() throws LabelAlreadyUsedException, 
BeginTransactionException, AnalysisException {
+    public void execute() throws LabelAlreadyUsedException, 
BeginTransactionException, AnalysisException, DuplicatedRequestException {
         writeLock();
         try {
             unprotectedExecute();
@@ -372,7 +377,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         }
     }
 
-    public void unprotectedExecute() throws LabelAlreadyUsedException, 
BeginTransactionException, AnalysisException {
+    public void unprotectedExecute()
+            throws LabelAlreadyUsedException, BeginTransactionException, 
AnalysisException, DuplicatedRequestException {
         // check if job state is pending
         if (state != JobState.PENDING) {
             return;
@@ -384,16 +390,23 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
     }
 
     public void processTimeout() {
+        // this is only for jobs which transaction is not started.
+        // if transaction is started, global transaction manager will handle 
the timeout.
         writeLock();
         try {
-            if (isCompleted() || getDeadlineMs() >= System.currentTimeMillis() 
|| isCommitting) {
+            if (state != JobState.PENDING) {
                 return;
             }
-            unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.TIMEOUT, 
"loading timeout to cancel"), true);
+
+            if (!isTimeout()) {
+                return;
+            }
+
+            unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.TIMEOUT, 
"loading timeout to cancel"), false);
+            logFinalOperation();
         } finally {
             writeUnlock();
         }
-        logFinalOperation();
     }
 
     protected void unprotectedExecuteJob() {
@@ -420,8 +433,12 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             case LOADING:
                 executeLoad();
                 break;
+            case COMMITTED:
+                executeCommitted();
+                break;
             case FINISHED:
                 executeFinish();
+                break;
             default:
                 break;
         }
@@ -432,21 +449,30 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         state = JobState.LOADING;
     }
 
-    public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn) {
+    private void executeCommitted() {
+        state = JobState.COMMITTED;
+    }
+
+    // if needLog is false, no need to write edit log.
+    public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, 
boolean needLog) {
         writeLock();
         try {
             unprotectedExecuteCancel(failMsg, abortTxn);
+            if (needLog) {
+                logFinalOperation();
+            }
         } finally {
             writeUnlock();
         }
-        logFinalOperation();
     }
 
     public void cancelJob(FailMsg failMsg) throws DdlException {
         writeLock();
         try {
-            // check
-            if (!isCancellable) {
+            checkAuth("CANCEL LOAD");
+
+            // mini load can not be cancelled by frontend
+            if (jobType == EtlJobType.MINI) {
                 throw new DdlException("Job could not be cancelled in type " + 
jobType.name());
             }
             if (isCommitting) {
@@ -455,20 +481,19 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
                                          + "The job could not be cancelled in 
this step").build());
                 throw new DdlException("Job could not be cancelled while txn 
is committing");
             }
-            if (isCompleted()) {
+            if (isTxnDone()) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
                                  .add("state", state)
-                                 .add("error_msg", "Job could not be cancelled 
when job is completed")
+                                 .add("error_msg", "Job could not be cancelled 
when job is " + state)
                                  .build());
                 throw new DdlException("Job could not be cancelled when job is 
finished or cancelled");
             }
 
-            checkAuth("CANCEL LOAD");
             unprotectedExecuteCancel(failMsg, true);
+            logFinalOperation();
         } finally {
             writeUnlock();
         }
-        logFinalOperation();
     }
 
     private void checkAuth(String command) throws DdlException {
@@ -526,7 +551,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      * This method will cancel job without edit log and lock
      *
      * @param failMsg
-     * @param abortTxn true: abort txn when cancel job, false: only change the 
state of job and ignore abort txn
+     * @param abortTxn
+     *            true: abort txn when cancel job, false: only change the 
state of job and ignore abort txn
      */
     protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) 
{
         LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
@@ -551,8 +577,9 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         this.failMsg = failMsg;
         finishTimestamp = System.currentTimeMillis();
 
-        // remove callback
+        // remove callback before abortTransaction(), so that the 
afterAborted() callback will not be called again
         
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
+
         if (abortTxn) {
             // abort txn
             try {
@@ -563,9 +590,9 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
                 
Catalog.getCurrentGlobalTransactionMgr().abortTransaction(transactionId, 
failMsg.getMsg());
             } catch (UserException e) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
-                                 .add("transaction_id", transactionId)
-                                 .add("error_msg", "failed to abort txn when 
job is cancelled, txn will be aborted later")
-                                 .build());
+                        .add("transaction_id", transactionId)
+                        .add("error_msg", "failed to abort txn when job is 
cancelled. " + e.getMessage())
+                        .build());
             }
         }
         
@@ -725,8 +752,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
     public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
         writeLock();
         try {
-            if (isCompleted()) {
-                throw new TransactionException("txn could not be committed 
when job has been cancelled");
+            if (isTxnDone()) {
+                throw new TransactionException("txn could not be committed 
because job is: " + state);
             }
             isCommitting = true;
         } finally {
@@ -742,6 +769,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         writeLock();
         try {
             isCommitting = false;
+            state = JobState.COMMITTED;
         } finally {
             writeUnlock();
         }
@@ -752,6 +780,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         writeLock();
         try {
             replayTxnAttachment(txnState);
+            transactionId = txnState.getTransactionId();
+            state = JobState.COMMITTED;
         } finally {
             writeUnlock();
         }
@@ -774,7 +804,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         }
         writeLock();
         try {
-            if (isCompleted()) {
+            if (isTxnDone()) {
                 return;
             }
             // record attachment in load job
@@ -799,6 +829,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, 
txnState.getReason());
             finishTimestamp = txnState.getFinishTime();
             state = JobState.CANCELLED;
+            
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
         } finally {
             writeUnlock();
         }
@@ -828,6 +859,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             progress = 100;
             finishTimestamp = txnState.getFinishTime();
             state = JobState.FINISHED;
+            
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
         } finally {
             writeUnlock();
         }
diff --git 
a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
index eae513d..5f6fa22 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
@@ -19,6 +19,7 @@ package org.apache.doris.load.loadv2;
 
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
@@ -73,8 +74,16 @@ public class LoadJobScheduler extends MasterDaemon {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
                                  .add("error_msg", "There are error properties 
in job. Job will be cancelled")
                                  .build(), e);
-                loadJob.cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()), false);
+                loadJob.cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
+                        false, true);
                 continue;
+            } catch (DuplicatedRequestException e) {
+                // should not happen in load job scheduler, there is no 
request id.
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
+                        .add("error_msg", "Failed to begin txn with duplicate 
request. Job will be rescheduled later")
+                        .build(), e);
+                needScheduleJobs.put(loadJob);
+                return;
             } catch (BeginTransactionException e) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
                                  .add("error_msg", "Failed to begin txn when 
job is scheduling. "
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 3637952..2edfecf 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -40,6 +40,8 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TMiniLoadBeginRequest;
 import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -97,7 +99,7 @@ public class LoadManager implements Writable{
         LoadJob loadJob = null;
         writeLock();
         try {
-            checkLabelUsed(dbId, stmt.getLabel().getLabelName(), null);
+            checkLabelUsed(dbId, stmt.getLabel().getLabelName());
             if (stmt.getBrokerDesc() == null) {
                 throw new DdlException("LoadManager only support the broker 
load.");
             }
@@ -135,23 +137,22 @@ public class LoadManager implements Writable{
         LoadJob loadJob = null;
         writeLock();
         try {
-            checkLabelUsed(database.getId(), request.getLabel(), 
request.getRequest_id());
             loadJob = new MiniLoadJob(database.getId(), request);
-            createLoadJob(loadJob);
+            // call unprotectedExecute before adding load job. so that if job 
is not started ok, no need to add.
+            // NOTICE(cmy): this order is only for Mini Load, because mini 
load's unprotectedExecute() only do beginTxn().
+            // for other kind of load job, execute the job after adding job.
             // Mini load job must be executed before release write lock.
             // Otherwise, the duplicated request maybe get the transaction id 
before transaction of mini load is begun.
             loadJob.unprotectedExecute();
+            createLoadJob(loadJob);
         } catch (DuplicatedRequestException e) {
-            LOG.info(new LogBuilder(LogKey.LOAD_JOB, 
e.getDuplicatedRequestId())
-                             .add("msg", "the duplicated request returns the 
txn id "
-                                     + "which was created by the same mini 
load")
-                             .build());
-            return 
dbIdToLabelToLoadJobs.get(database.getId()).get(request.getLabel())
-                    .stream().filter(entity -> entity.getState() != 
JobState.CANCELLED).findFirst()
-                    .get().getTransactionId();
+            // this is a duplicate request, just return previous txn id
+            LOG.info("deplicate request for mini load. request id: {}, txn: 
{}", e.getDuplicatedRequestId(), e.getTxnId());
+            return e.getTxnId();
         } catch (UserException e) {
             if (loadJob != null) {
-                loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, 
e.getMessage()), false);
+                loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, 
e.getMessage()), false,
+                        false /* no need to write edit log, because 
createLoadJob log is not wrote yet */);
             }
             throw e;
         } finally {
@@ -185,7 +186,7 @@ public class LoadManager implements Writable{
         Database database = checkDb(stmt.getLabel().getDbName());
         writeLock();
         try {
-            checkLabelUsed(database.getId(), stmt.getLabel().getLabelName(), 
null);
+            checkLabelUsed(database.getId(), stmt.getLabel().getLabelName());
             Catalog.getCurrentCatalog().getLoadInstance().addLoadJob(stmt, 
jobType, timestamp);
         } finally {
             writeUnlock();
@@ -211,7 +212,7 @@ public class LoadManager implements Writable{
         Database database = checkDb(ClusterNamespace.getFullName(cluster, 
request.getDb()));
         writeLock();
         try {
-            checkLabelUsed(database.getId(), request.getLabel(), null);
+            checkLabelUsed(database.getId(), request.getLabel());
             return 
Catalog.getCurrentCatalog().getLoadInstance().addMiniLoadJob(request);
         } finally {
             writeUnlock();
@@ -222,7 +223,7 @@ public class LoadManager implements Writable{
         Database database = checkDb(fullDbName);
         writeLock();
         try {
-            checkLabelUsed(database.getId(), label, null);
+            checkLabelUsed(database.getId(), label);
             Catalog.getCurrentCatalog().getLoadInstance()
                     .registerMiniLabel(fullDbName, label, 
System.currentTimeMillis());
         } finally {
@@ -237,6 +238,7 @@ public class LoadManager implements Writable{
                          .build());
     }
 
+    // add load job and also add to to callback factory
     private void createLoadJob(LoadJob loadJob) {
         addLoadJob(loadJob);
         // add callback before txn created, because callback will be performed 
on replay without txn begin
@@ -296,7 +298,7 @@ public class LoadManager implements Writable{
             if (loadJobList == null) {
                 throw new DdlException("Load job does not exist");
             }
-            Optional<LoadJob> loadJobOptional = 
loadJobList.stream().filter(entity -> !entity.isCompleted()).findFirst();
+            Optional<LoadJob> loadJobOptional = 
loadJobList.stream().filter(entity -> !entity.isTxnDone()).findFirst();
             if (!loadJobOptional.isPresent()) {
                 throw new DdlException("There is no uncompleted job which 
label is " + stmt.getLabel());
             }
@@ -310,6 +312,15 @@ public class LoadManager implements Writable{
 
     public void replayEndLoadJob(LoadJobFinalOperation operation) {
         LoadJob job = idToLoadJob.get(operation.getId());
+        if (job == null) {
+            // This should not happen.
+            // Last time I found that when user submit a job with already used 
label, an END_LOAD_JOB edit log
+            // will be wrote but the job is not added to 'idToLoadJob', so 
this job here we got will be null.
+            // And this bug has been fixed.
+            // Just add a log here to observe.
+            LOG.warn("job does not exist when replaying end load job edit log: 
{}", operation);
+            return;
+        }
         job.unprotectReadEndOperation(operation);
         LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId())
                          .add("operation", operation)
@@ -360,6 +371,7 @@ public class LoadManager implements Writable{
         }
     }
 
+    // only for those jobs which transaction is not started
     public void processTimeoutJobs() {
         idToLoadJob.values().stream().forEach(entity -> 
entity.processTimeout());
     }
@@ -515,7 +527,7 @@ public class LoadManager implements Writable{
      * @param requestId: the uuid of each txn request from BE
      * @throws LabelAlreadyUsedException throw exception when label has been 
used by an unfinished job.
      */
-    private void checkLabelUsed(long dbId, String label, TUniqueId requestId)
+    private void checkLabelUsed(long dbId, String label)
             throws DdlException {
         // if label has been used in old load jobs
         Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label);
@@ -527,11 +539,6 @@ public class LoadManager implements Writable{
                 Optional<LoadJob> loadJobOptional =
                         labelLoadJobs.stream().filter(entity -> 
entity.getState() != JobState.CANCELLED).findFirst();
                 if (loadJobOptional.isPresent()) {
-                    LoadJob loadJob = loadJobOptional.get();
-                    if (loadJob.getRequestId() != null && requestId != null && 
loadJob.getRequestId().equals(requestId)) {
-                        throw new 
DuplicatedRequestException(String.valueOf(loadJob.getId()),
-                                "The request is duplicated with " + 
loadJob.getId());
-                    }
                     LOG.warn("Failed to add load job when label {} has been 
used.", label);
                     throw new LabelAlreadyUsedException(label);
                 }
@@ -583,6 +590,27 @@ public class LoadManager implements Writable{
         }
     }
 
+    // in previous implementation, there is a bug that when the job's 
corresponding transaction is
+    // COMMITTED but not VISIBLE, the load job's state is LOADING, so that the 
job may be CANCELLED
+    // by timeout checker, which is not right.
+    // So here we will check each LOADING load jobs' txn status, if it is 
COMMITTED, change load job's
+    // state to COMMITTED.
+    // this method should be removed at next upgrading.
+    public void transferLoadingStateToCommitted(GlobalTransactionMgr txnMgr) {
+        for (LoadJob job : idToLoadJob.values()) {
+            if (job.getState() == JobState.LOADING) {
+                // unfortunately, transaction id in load job is also not 
persisted, so we have to traverse
+                // all transactions to find it.
+                TransactionState txn = 
txnMgr.getCommittedTransactionStateByCallbackId(job.getCallbackId());
+                if (txn != null) {
+                    job.updateState(JobState.COMMITTED);
+                    LOG.info("transfer load job {} state from LOADING to 
COMMITTED, because txn {} is COMMITTED",
+                            job.getId(), txn.getTransactionId());
+                }
+            }
+        }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         List<LoadJob> loadJobs = 
idToLoadJob.values().stream().filter(this::needSave).collect(Collectors.toList());
diff --git 
a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
index ae9019d..2f75e1f 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
@@ -24,9 +24,9 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 /**
- * LoadTimeoutChecker is performed to cancel the timeout job.
- * The job which is not finished, not cancelled, not isCommitting will be 
checked.
- * The standard of timeout is CurrentTS > (CreateTs + timeoutSeconds * 1000).
+ * LoadTimeoutChecker will try to cancel the timeout load jobs.
+ * And it will not handle the job which the corresponding transaction is 
started.
+ * For those jobs, global transaction manager cancel the corresponding job 
while aborting the timeout transaction.
  */
 public class LoadTimeoutChecker extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(LoadTimeoutChecker.class);
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
index 6566209..9be8021 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
@@ -60,7 +61,6 @@ public class MiniLoadJob extends LoadJob {
         if (request.isSetMax_filter_ratio()) {
             this.maxFilterRatio = request.getMax_filter_ratio();
         }
-        this.isCancellable = false;
         this.createTimestamp = request.getCreate_timestamp();
         this.loadStartTimestamp = createTimestamp;
         this.authorizationInfo = gatherAuthInfo();
@@ -86,9 +86,10 @@ public class MiniLoadJob extends LoadJob {
     }
 
     @Override
-    public void beginTxn() throws LabelAlreadyUsedException, 
BeginTransactionException, AnalysisException {
+    public void beginTxn()
+            throws LabelAlreadyUsedException, BeginTransactionException, 
AnalysisException, DuplicatedRequestException {
         transactionId = Catalog.getCurrentGlobalTransactionMgr()
-                .beginTransaction(dbId, label, null, "FE: " + 
FrontendOptions.getLocalHostAddress(),
+                .beginTransaction(dbId, label, requestId, "FE: " + 
FrontendOptions.getLocalHostAddress(),
                                   
TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
                                   timeoutSecond);
     }
diff --git 
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java 
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 8ebeb84..b7fb7a5 100644
--- 
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.routineload;
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
@@ -148,6 +149,10 @@ public abstract class RoutineLoadTaskInfo {
                     routineLoadJob.getDbId(), DebugUtil.printId(id), null, 
"FE: " + FrontendOptions.getLocalHostAddress(),
                     TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, 
routineLoadJob.getId(),
                     timeoutMs / 1000);
+        } catch (DuplicatedRequestException e) {
+            // should not happen, because we didn't pass request id in when 
begin transaction
+            LOG.warn("failed to begin txn for routine load task: {}, {}", 
DebugUtil.printId(id), e.getMessage());
+            return false;
         } catch (LabelAlreadyUsedException e) {
             // this should not happen for a routine load task, throw it out
             throw e;
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 81f5743..8a29838 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -564,7 +564,6 @@ public class EditLog {
                     final TransactionState state = (TransactionState) 
journal.getData();
                     
Catalog.getCurrentGlobalTransactionMgr().replayUpsertTransactionState(state);
                     LOG.debug("opcode: {}, tid: {}", opCode, 
state.getTransactionId());
-
                     break;
                 }
                 case OperationType.OP_DELETE_TRANSACTION_STATE: {
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 0b64d80..d7450be 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.AuditLog;
 import org.apache.doris.common.AuthenticationException;
 import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.ThriftServerContext;
@@ -609,6 +610,10 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         result.setStatus(status);
         try {
             result.setTxnId(loadTxnBeginImpl(request, clientAddr));
+        } catch (DuplicatedRequestException e) {
+            // this is a duplicate request, just return previous txn id
+            LOG.info("deplicate request for stream load. request id: {}, txn: 
{}", e.getDuplicatedRequestId(), e.getTxnId());
+            result.setTxnId(e.getTxnId());
         } catch (LabelAlreadyUsedException e) {
             status.setStatus_code(TStatusCode.LABEL_ALREADY_EXISTS);
             status.addToError_msgs(e.getMessage());
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java 
b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 15d3e19..3b537a2 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -30,11 +30,14 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.metric.MetricRepo;
@@ -62,6 +65,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -77,7 +81,7 @@ import java.util.stream.Collectors;
  * 3. abort
  * Attention: all api in txn manager should get db lock or load lock first, 
then get txn manager's lock, or there will be dead lock
  */
-public class GlobalTransactionMgr {
+public class GlobalTransactionMgr implements Writable {
     private static final Logger LOG = 
LogManager.getLogger(GlobalTransactionMgr.class);
     
     // the lock is used to control the access to transaction states
@@ -87,8 +91,12 @@ public class GlobalTransactionMgr {
     
     // transactionId -> TransactionState
     private Map<Long, TransactionState> idToTransactionState = 
Maps.newConcurrentMap();
-    // db id -> (label -> txn id)
-    private com.google.common.collect.Table<Long, String, Long> 
dbIdToTxnLabels = HashBasedTable.create();
+    // db id -> (label -> txn ids)
+    // this is used for checking if label already used. a label may correspond 
to multiple txns,
+    // and only one is success.
+    // this member should be consistent with idToTransactionState, which means 
if a txn exist in idToTransactionState,
+    // it must exists in dbIdToTxnLabels, and vice versa
+    private com.google.common.collect.Table<Long, String, Set<Long>> 
dbIdToTxnLabels = HashBasedTable.create();
     // count the number of running txns of each database, except for the 
routine load txn
     private Map<Long, Integer> runningTxnNums = Maps.newHashMap();
     // count only the number of running routine load txns of each database
@@ -109,7 +117,8 @@ public class GlobalTransactionMgr {
     }
 
     public long beginTransaction(long dbId, String label, String coordinator, 
LoadJobSourceType sourceType,
-            long timeoutSecond) throws AnalysisException, 
LabelAlreadyUsedException, BeginTransactionException {
+            long timeoutSecond)
+            throws AnalysisException, LabelAlreadyUsedException, 
BeginTransactionException, DuplicatedRequestException {
         return beginTransaction(dbId, label, null, coordinator, sourceType, 
-1, timeoutSecond);
     }
     
@@ -123,11 +132,12 @@ public class GlobalTransactionMgr {
      *
      * @param coordinator
      * @throws BeginTransactionException
+     * @throws DuplicatedRequestException
      * @throws IllegalTransactionParameterException
      */
     public long beginTransaction(long dbId, String label, TUniqueId requestId,
             String coordinator, LoadJobSourceType sourceType, long listenerId, 
long timeoutSecond)
-            throws AnalysisException, LabelAlreadyUsedException, 
BeginTransactionException {
+            throws AnalysisException, LabelAlreadyUsedException, 
BeginTransactionException, DuplicatedRequestException {
         
         if (Config.disable_load_job) {
             throw new AnalysisException("disable_load_job is set to true, all 
load jobs are prevented");
@@ -145,20 +155,38 @@ public class GlobalTransactionMgr {
             Preconditions.checkNotNull(coordinator);
             Preconditions.checkNotNull(label);
             FeNameFormat.checkLabel(label);
-            Map<String, Long> txnLabels = dbIdToTxnLabels.row(dbId);
-            if (txnLabels != null && txnLabels.containsKey(label)) {
-                TransactionState existTxn = 
getTransactionState(txnLabels.get(label));
-                // check timestamp
-                if (requestId != null) {
-                    if (existTxn != null && existTxn.getTransactionStatus() == 
TransactionStatus.PREPARE
-                            && existTxn.getRequsetId() != null && 
existTxn.getRequsetId().equals(requestId)) {
+
+            /*
+             * Check if label already used, by following steps
+             * 1. get all existing transactions
+             * 2. if there is a PREPARE transaction, check if this is a retry 
request. If yes, return the
+             *    existing txn id.
+             * 3. if there is a non-aborted transaction, throw label already 
used exception.
+             */
+            Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label);
+            if (existingTxnIds != null && !existingTxnIds.isEmpty()) {
+                List<TransactionState> notAbortedTxns = Lists.newArrayList();
+                for (long txnId : existingTxnIds) {
+                    TransactionState txn = idToTransactionState.get(txnId);
+                    Preconditions.checkNotNull(txn);
+                    if (txn.getTransactionStatus() != 
TransactionStatus.ABORTED) {
+                        notAbortedTxns.add(txn);
+                    }
+                }
+                // there should be at most 1 txn in PREPARE/COMMITTED/VISIBLE 
status
+                Preconditions.checkState(notAbortedTxns.size() <= 1, 
notAbortedTxns);
+                if (!notAbortedTxns.isEmpty()) {
+                    TransactionState notAbortedTxn = notAbortedTxns.get(0);
+                    if (requestId != null && 
notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE
+                            && notAbortedTxn.getRequsetId() != null && 
notAbortedTxn.getRequsetId().equals(requestId)) {
                         // this may be a retry request for same job, just 
return existing txn id.
-                        return txnLabels.get(label);
+                        throw new 
DuplicatedRequestException(DebugUtil.printId(requestId),
+                                notAbortedTxn.getTransactionId(), "");
                     }
+                    throw new LabelAlreadyUsedException(label, 
notAbortedTxn.getTransactionStatus());
                 }
-                throw new LabelAlreadyUsedException(label, 
existTxn.getTransactionStatus());
             }
-            
+
             checkRunningTxnExceedLimit(dbId, sourceType);
           
             long tid = idGenerator.getNextTransactionId();
@@ -173,6 +201,8 @@ public class GlobalTransactionMgr {
             }
 
             return tid;
+        } catch (DuplicatedRequestException e) {
+            throw e;
         } catch (Exception e) {
             if (MetricRepo.isInit.get()) {
                 MetricRepo.COUNTER_TXN_REJECT.increase(1L);
@@ -203,15 +233,13 @@ public class GlobalTransactionMgr {
     public TransactionStatus getLabelState(long dbId, String label) {
         readLock();
         try {
-            Map<String, Long> txnLabels = dbIdToTxnLabels.row(dbId);
-            if (txnLabels == null) {
+            Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label);
+            if (existingTxnIds == null || existingTxnIds.isEmpty()) {
                 return TransactionStatus.UNKNOWN;
             }
-            Long transactionId = txnLabels.get(label);
-            if (transactionId == null) {
-                return TransactionStatus.UNKNOWN;
-            }
-            return 
idToTransactionState.get(transactionId).getTransactionStatus();
+            // find the latest txn (which id is largest)
+            long maxTxnId = 
existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).get();
+            return idToTransactionState.get(maxTxnId).getTransactionStatus();
         } finally {
             readUnlock();
         }
@@ -224,8 +252,8 @@ public class GlobalTransactionMgr {
             if (state == null) {
                 return;
             }
-            editLog.logDeleteTransactionState(state);
             replayDeleteTransactionState(state);
+            editLog.logDeleteTransactionState(state);
         } finally {
             writeUnlock();
         }
@@ -493,18 +521,29 @@ public class GlobalTransactionMgr {
     public void abortTransaction(Long dbId, String label, String reason) 
throws UserException {
         Preconditions.checkNotNull(label);
         Long transactionId = null;
-        writeLock();
+        readLock();
         try {
-            Map<String, Long> dbTxns = dbIdToTxnLabels.row(dbId);
-            if (dbTxns == null) {
+            Set<Long> existingTxns = dbIdToTxnLabels.get(dbId, label);
+            if (existingTxns == null || existingTxns.isEmpty()) {
                 throw new UserException("transaction not found, label=" + 
label);
             }
-            transactionId = dbTxns.get(label);
-            if (transactionId == null) {
+            // find PREPARE txn. For one load label, there should be only one 
PREPARE txn.
+            TransactionState prepareTxn = null;
+            for (Long txnId : existingTxns) {
+                TransactionState txn = idToTransactionState.get(txnId);
+                if (txn.getTransactionStatus() == TransactionStatus.PREPARE) {
+                    prepareTxn = txn;
+                    break;
+                }
+            }
+
+            if (prepareTxn == null) {
                 throw new UserException("transaction not found, label=" + 
label);
             }
+
+            transactionId = prepareTxn.getTransactionId();
         } finally {
-            writeUnlock();
+            readUnlock();
         }
         abortTransaction(transactionId, reason);
     }
@@ -837,7 +876,7 @@ public class GlobalTransactionMgr {
 
         List<Long> timeoutTxns = Lists.newArrayList();
         List<Long> expiredTxns = Lists.newArrayList();
-        writeLock();
+        readLock();
         try {
             for (TransactionState transactionState : 
idToTransactionState.values()) {
                 if (transactionState.isExpired(currentMillis)) {
@@ -849,7 +888,7 @@ public class GlobalTransactionMgr {
                 }
             }
         } finally {
-            writeUnlock();
+            readUnlock();
         }
 
         // delete expired txns
@@ -959,7 +998,8 @@ public class GlobalTransactionMgr {
         }
         if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED
                 || transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-            throw new UserException("transaction's state is already committed 
or visible, could not abort");
+            throw new UserException("transaction's state is already "
+                    + transactionState.getTransactionStatus() + ", could not 
abort");
         }
         transactionState.setFinishTime(System.currentTimeMillis());
         transactionState.setReason(reason);
@@ -980,10 +1020,10 @@ public class GlobalTransactionMgr {
             transactionState.replaySetTransactionStatus();
             Database db = catalog.getDb(transactionState.getDbId());
             if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
-                LOG.debug("replay a committed transaction {}", 
transactionState);
+                LOG.info("replay a committed transaction {}", 
transactionState);
                 updateCatalogAfterCommitted(transactionState, db);
             } else if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                LOG.debug("replay a visible transaction {}", transactionState);
+                LOG.info("replay a visible transaction {}", transactionState);
                 updateCatalogAfterVisible(transactionState, db);
             }
             TransactionState preTxnState = 
idToTransactionState.get(transactionState.getTransactionId());
@@ -1000,7 +1040,11 @@ public class GlobalTransactionMgr {
         writeLock();
         try {
             idToTransactionState.remove(transactionState.getTransactionId());
-            dbIdToTxnLabels.remove(transactionState.getDbId(), 
transactionState.getLabel());
+            Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), 
transactionState.getLabel());
+            txnIds.remove(transactionState.getTransactionId());
+            if (txnIds.isEmpty()) {
+                dbIdToTxnLabels.remove(transactionState.getDbId(), 
transactionState.getLabel());
+            }
         } finally {
             writeUnlock();
         }
@@ -1112,13 +1156,12 @@ public class GlobalTransactionMgr {
     }
     
     private void updateTxnLabels(TransactionState transactionState) {
-        // if the transaction is aborted, then its label could be reused
-        if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
-            dbIdToTxnLabels.remove(transactionState.getDbId(), 
transactionState.getLabel());
-        } else {
-            dbIdToTxnLabels.put(transactionState.getDbId(), 
transactionState.getLabel(),
-                                transactionState.getTransactionId());
+        Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), 
transactionState.getLabel());
+        if (txnIds == null) {
+            txnIds = Sets.newHashSet();
+            dbIdToTxnLabels.put(transactionState.getDbId(), 
transactionState.getLabel(), txnIds);
         }
+        txnIds.add(transactionState.getTransactionId());
     }
     
     private void updateDBRunningTxnNum(TransactionStatus preStatus, 
TransactionState curTxnState) {
@@ -1278,7 +1321,7 @@ public class GlobalTransactionMgr {
         return this.idGenerator;
     }
 
-    // this two function used to read snapshot or write snapshot
+    @Override
     public void write(DataOutput out) throws IOException {
         int numTransactions = idToTransactionState.size();
         out.writeInt(numTransactions);
@@ -1288,6 +1331,7 @@ public class GlobalTransactionMgr {
         idGenerator.write(out);
     }
     
+    @Override
     public void readFields(DataInput in) throws IOException {
         int numTransactions = in.readInt();
         for (int i = 0; i < numTransactions; ++i) {
@@ -1301,4 +1345,18 @@ public class GlobalTransactionMgr {
         }
         idGenerator.readFields(in);
     }
+
+    public TransactionState getCommittedTransactionStateByCallbackId(long 
callbackId) {
+        readLock();
+        try {
+            for (TransactionState txn : idToTransactionState.values()) {
+                if (txn.getCallbackId() == callbackId && 
txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
+                    return txn;
+                }
+            }
+        } finally {
+            readUnlock();
+        }
+        return null;
+    }
 }
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
index ef7fcbd..0324fda 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -336,7 +336,8 @@ public class TransactionState implements Writable {
             switch (transactionStatus) {
                 case COMMITTED:
                     // Maybe listener has been deleted. The txn need to be 
aborted later.
-                    throw new TransactionException("Failed to commit txn when 
callback could not be found");
+                    throw new TransactionException(
+                            "Failed to commit txn when callback " + callbackId 
+ "could not be found");
                 default:
                     break;
             }
@@ -469,6 +470,7 @@ public class TransactionState implements Writable {
         sb.append("transaction id: ").append(transactionId);
         sb.append(", label: ").append(label);
         sb.append(", db id: ").append(dbId);
+        sb.append(", callback id: ").append(callbackId);
         sb.append(", coordinator: ").append(coordinator);
         sb.append(", transaction status: ").append(transactionStatus);
         sb.append(", error replicas num: ").append(errorReplicas.size());
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java 
b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
index 25430c6..28accde 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
@@ -40,8 +40,9 @@ public class TxnStateCallbackFactory {
     }
 
     public synchronized void removeCallback(long id) {
-        callbacks.remove(id);
-        LOG.info("remove callback of txn state : {}", id);
+        if (callbacks.remove(id) != null) {
+            LOG.info("remove callback of txn state : {}", id);
+        }
     }
 
     public synchronized TxnStateChangeCallback getCallback(long id) {
diff --git a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java 
b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
index 1ec3944..392005f 100644
--- a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
+++ b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.AssertionFailedError;
@@ -106,7 +107,11 @@ abstract public class DorisHttpTestCase {
     public static long testPartitionCurrentVersionHash = 12312;
     public static long testPartitionNextVersionHash = 123123123;
 
-    public static final int HTTP_PORT = 32474;
+    public static final int HTTP_PORT;
+    static {
+        Random r = new Random(System.currentTimeMillis());
+        HTTP_PORT = 30000 + r.nextInt(10000);
+    }
 
     protected static final String URI = "http://localhost:"; + HTTP_PORT + 
"/api/" + DB_NAME + "/" + TABLE_NAME;
 
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java 
b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index 0f32c13..df1f8b8 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.metric.LongCounterMetric;
@@ -100,7 +101,7 @@ public class LoadJobTest {
     @Test
     public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr,
                             @Mocked MasterTaskExecutor masterTaskExecutor)
-            throws LabelAlreadyUsedException, BeginTransactionException, 
AnalysisException {
+            throws LabelAlreadyUsedException, BeginTransactionException, 
AnalysisException, DuplicatedRequestException {
         LoadJob loadJob = new BrokerLoadJob();
         new Expectations() {
             {
diff --git 
a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java 
b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 943b6b2..91b1682 100644
--- 
a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.UserException;
@@ -103,7 +104,7 @@ public class GlobalTransactionMgrTest {
 
     @Test
     public void testBeginTransaction() throws LabelAlreadyUsedException, 
AnalysisException,
-            BeginTransactionException {
+            BeginTransactionException, DuplicatedRequestException {
         FakeCatalog.setCatalog(masterCatalog);
         long transactionId = 
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
                 CatalogTestUtil.testTxnLable1,
@@ -119,7 +120,7 @@ public class GlobalTransactionMgrTest {
 
     @Test
     public void testBeginTransactionWithSameLabel() throws 
LabelAlreadyUsedException, AnalysisException,
-            BeginTransactionException {
+            BeginTransactionException, DuplicatedRequestException {
         FakeCatalog.setCatalog(masterCatalog);
         long transactionId = 0;
         try {
@@ -599,7 +600,7 @@ public class GlobalTransactionMgrTest {
 
     @Test
     public void testDeleteTransaction() throws LabelAlreadyUsedException,
-            AnalysisException, BeginTransactionException {
+            AnalysisException, BeginTransactionException, 
DuplicatedRequestException {
 
         long transactionId = 
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
                 CatalogTestUtil.testTxnLable1,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to