This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 46181c0 Fix some bugs about load label (#2241)
46181c0 is described below
commit 46181c08800942c26123745688255c70490aa3f9
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Nov 23 00:04:45 2019 +0800
Fix some bugs about load label (#2241)
---
.../sql-statements/Data Manipulation/insert.md | 10 +-
.../sql-statements/Data Manipulation/insert_EN.md | 10 +-
.../java/org/apache/doris/catalog/Catalog.java | 6 +-
.../doris/common/DuplicatedRequestException.java | 15 ++-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 32 +++--
.../org/apache/doris/load/loadv2/JobState.java | 10 +-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 94 +++++++++-----
.../apache/doris/load/loadv2/LoadJobScheduler.java | 11 +-
.../org/apache/doris/load/loadv2/LoadManager.java | 70 +++++++---
.../doris/load/loadv2/LoadTimeoutChecker.java | 6 +-
.../org/apache/doris/load/loadv2/MiniLoadJob.java | 7 +-
.../load/routineload/RoutineLoadTaskInfo.java | 5 +
.../java/org/apache/doris/persist/EditLog.java | 1 -
.../apache/doris/service/FrontendServiceImpl.java | 5 +
.../doris/transaction/GlobalTransactionMgr.java | 142 +++++++++++++++------
.../apache/doris/transaction/TransactionState.java | 4 +-
.../doris/transaction/TxnStateCallbackFactory.java | 5 +-
.../org/apache/doris/http/DorisHttpTestCase.java | 7 +-
.../org/apache/doris/load/loadv2/LoadJobTest.java | 3 +-
.../transaction/GlobalTransactionMgrTest.java | 7 +-
20 files changed, 308 insertions(+), 142 deletions(-)
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data
Manipulation/insert.md
b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/insert.md
index 6483c20..98588a6 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data
Manipulation/insert.md
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data
Manipulation/insert.md
@@ -23,8 +23,8 @@ under the License.
```
INSERT INTO table_name
+ [ PARTITION (p1, ...) ]
[ WITH LABEL label]
- [ PARTITION (, ...) ]
[ (column [, ...]) ]
[ [ hint [, ...] ] ]
{ VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
@@ -34,9 +34,9 @@ INSERT INTO table_name
> tablet_name: 导入数据的目的表。可以是 `db_name.table_name` 形式
>
-> label: 为 Insert 任务指定一个 label
+> partitions: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔
>
-> partition_names: 指定待导入的分区,必须是 `table_name` 中存在的分区,多个分区名称用逗号分隔
+> label: 为 Insert 任务指定一个 label
>
> column_name: 指定的目的列,必须是 `table_name` 中存在的列
>
@@ -88,10 +88,10 @@ INSERT INTO test SELECT * FROM test2;
INSERT INTO test (c1, c2) SELECT * from test2;
```
-4. 向 `test` 表中导入一个查询语句结果,并指定 label
+4. 向 `test` 表中导入一个查询语句结果,并指定 partition 和 label
```
-INSERT INTO test WITH LABEL `label1` SELECT * FROM test2;
+INSERT INTO test PARTITION(p1, p2) WITH LABEL `label1` SELECT * FROM test2;
INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2;
```
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data
Manipulation/insert_EN.md
b/docs/documentation/en/sql-reference/sql-statements/Data
Manipulation/insert_EN.md
index 128060f..8226449 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data
Manipulation/insert_EN.md
+++ b/docs/documentation/en/sql-reference/sql-statements/Data
Manipulation/insert_EN.md
@@ -23,8 +23,8 @@ under the License.
```
INSERT INTO table_name
+[ PARTITION (p1, ...)]
[ WITH LABEL label]
-[ PARTICIPATION [...]
[ (column [, ...]) ]
[ [ hint [, ...] ] ]
{ VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
@@ -34,9 +34,9 @@ INSERT INTO table_name
> tablet_name: Target table for loading data. It can be in the form of
> `db_name.table_name`.
>
-> label: Specifies a label for Insert job.
+> partitions: Specifies the partitions to be loaded, with multiple partition
names separated by commas. The partitions must exist in `table_name`,
>
-> partition_names: Specifies the partitions to be loaded, with multiple
partition names separated by commas. The partitions must exist in `table_name`,
+> label: Specifies a label for Insert job.
>
> column_name: The specified destination columns must be columns that exists
> in `table_name`.
>
@@ -89,10 +89,10 @@ INSERT INTO test SELECT * FROM test2
INSERT INTO test (c1, c2) SELECT * from test2
```
-4. Insert into table `test` with specified label
+4. Insert into table `test` with specified partition and label
```
-INSERT INTO test WITH LABEL `label1` SELECT * FROM test2;
+INSERT INTO test PARTITION(p1, p2) WITH LABEL `label1` SELECT * FROM test2;
INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2;
```
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index af506f7..5926411 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -1338,14 +1338,13 @@ public class Catalog {
checksum = loadExportJob(dis, checksum);
checksum = loadBackupHandler(dis, checksum);
checksum = loadPaloAuth(dis, checksum);
+ // global transaction must be replayed before load jobs v2
checksum = loadTransactionState(dis, checksum);
checksum = loadColocateTableIndex(dis, checksum);
checksum = loadRoutineLoadJobs(dis, checksum);
checksum = loadLoadJobsV2(dis, checksum);
checksum = loadSmallFiles(dis, checksum);
-
-
long remoteChecksum = dis.readLong();
Preconditions.checkState(remoteChecksum == checksum,
remoteChecksum + " vs. " + checksum);
} finally {
@@ -1720,7 +1719,8 @@ public class Catalog {
public long loadLoadJobsV2(DataInputStream in, long checksum) throws
IOException {
if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_50) {
- Catalog.getCurrentCatalog().getLoadManager().readFields(in);
+ loadManager.readFields(in);
+ loadManager.transferLoadingStateToCommitted(globalTransactionMgr);
}
return checksum;
}
diff --git
a/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java
b/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java
index e6aa8d3..f3de2f3 100644
--- a/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java
+++ b/fe/src/main/java/org/apache/doris/common/DuplicatedRequestException.java
@@ -17,13 +17,26 @@
package org.apache.doris.common;
+/*
+ * This exception throws when the request from Backend is duplicated.
+ * It is currently used for mini load and stream load's begin txn requests.
+ * Because the request may be a retry request, so that we should throw this
exception
+ * and return the 'already-begun' txn id.
+ */
public class DuplicatedRequestException extends DdlException {
private String duplicatedRequestId;
+ // save exist txn id
+ private long txnId;
- public DuplicatedRequestException(String duplicatedRequestId, String msg) {
+ public DuplicatedRequestException(String duplicatedRequestId, long txnId,
String msg) {
super(msg);
this.duplicatedRequestId = duplicatedRequestId;
+ this.txnId = txnId;
+ }
+
+ public long getTxnId() {
+ return txnId;
}
public String getDuplicatedRequestId() {
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index df2d68c..6945a44 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
@@ -201,7 +202,8 @@ public class BrokerLoadJob extends LoadJob {
}
@Override
- public void beginTxn() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
+ public void beginTxn()
+ throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, label, null, "FE: " +
FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
@@ -237,10 +239,10 @@ public class BrokerLoadJob extends LoadJob {
writeLock();
try {
// check if job has been completed
- if (isCompleted()) {
+ if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("state", state)
- .add("error_msg", "this task will be ignored
when job is completed")
+ .add("error_msg", "this task will be ignored
when job is: " + state)
.build());
return;
}
@@ -250,6 +252,8 @@ public class BrokerLoadJob extends LoadJob {
}
if (loadTask.getRetryTime() <= 0) {
unprotectedExecuteCancel(failMsg, true);
+ logFinalOperation();
+ return;
} else {
// retry task
idToTasks.remove(loadTask.getSignature());
@@ -265,8 +269,6 @@ public class BrokerLoadJob extends LoadJob {
} finally {
writeUnlock();
}
-
- logFinalOperation();
}
/**
@@ -298,7 +300,7 @@ public class BrokerLoadJob extends LoadJob {
.add("msg", "The failure happens in analyze, the
load job will be cancelled with error:"
+ e.getMessage())
.build(), e);
- cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false);
+ cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true);
}
}
@@ -312,13 +314,14 @@ public class BrokerLoadJob extends LoadJob {
writeLock();
try {
// check if job has been cancelled
- if (isCompleted()) {
+ if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("state", state)
- .add("error_msg", "this task will be ignored
when job is completed")
+ .add("error_msg", "this task will be ignored
when job is: " + state)
.build());
return;
}
+
if (finishedTaskIds.contains(attachment.getTaskId())) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("task_id", attachment.getTaskId())
@@ -342,7 +345,7 @@ public class BrokerLoadJob extends LoadJob {
.add("database_id", dbId)
.add("error_msg", "Failed to divide job into
loading task.")
.build(), e);
- cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL,
e.getMessage()), true);
+ cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL,
e.getMessage()), true, true);
return;
}
@@ -401,10 +404,10 @@ public class BrokerLoadJob extends LoadJob {
writeLock();
try {
// check if job has been cancelled
- if (isCompleted()) {
+ if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("state", state)
- .add("error_msg", "this task will be ignored
when job is completed")
+ .add("error_msg", "this task will be ignored
when job is: " + state)
.build());
return;
}
@@ -437,7 +440,8 @@ public class BrokerLoadJob extends LoadJob {
// check data quality
if (!checkDataQuality()) {
- cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),true);
+ cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
+ true, true);
return;
}
Database db = null;
@@ -448,7 +452,7 @@ public class BrokerLoadJob extends LoadJob {
.add("database_id", dbId)
.add("error_msg", "db has been deleted when job
is loading")
.build(), e);
- cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true);
+ cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
}
db.writeLock();
try {
@@ -465,7 +469,7 @@ public class BrokerLoadJob extends LoadJob {
.add("database_id", dbId)
.add("error_msg", "Failed to commit txn with
error:" + e.getMessage())
.build(), e);
- cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()),true);
+ cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
return;
} finally {
db.writeUnlock();
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
index 6370e0a..7779921 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
@@ -17,9 +17,11 @@
package org.apache.doris.load.loadv2;
+// JobState will be persisted in meta data by name, so the order of these
state is not important
public enum JobState {
- PENDING,
- LOADING,
- FINISHED,
- CANCELLED
+ PENDING, // init state
+ LOADING, // job is running
+ COMMITTED, // transaction is committed but not visible
+ FINISHED, // transaction is visible and job is finished
+ CANCELLED // transaction is aborted and job is cancelled
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 408d191..93d25a1 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
@@ -118,12 +119,6 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
// This param is set true during txn is committing.
// During committing, the load job could not be cancelled.
protected boolean isCommitting = false;
- // This param is set true in mini load.
- // The streaming mini load could not be cancelled by frontend.
- protected boolean isCancellable = true;
-
- // only for persistence param
- private boolean isJobTypeRead = false;
protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -132,6 +127,9 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
protected LoadStatistic loadStatistic = new LoadStatistic();
+ // only for persistence param. see readFields() for usage
+ private boolean isJobTypeRead = false;
+
public static class LoadStatistic {
// number of rows processed on BE, this number will be updated
periodically by query report.
// A load job may has several load tasks(queries), and each task has
several fragments.
@@ -240,12 +238,12 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
return createTimestamp;
}
- public long getDeadlineMs() {
+ protected long getDeadlineMs() {
return createTimestamp + timeoutSecond * 1000;
}
- public long getLeftTimeMs() {
- return getDeadlineMs() - System.currentTimeMillis();
+ private boolean isTimeout() {
+ return System.currentTimeMillis() > getDeadlineMs();
}
public long getFinishTimestamp() {
@@ -290,6 +288,12 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
*/
abstract Set<String> getTableNames() throws MetaNotFoundException;
+ // return true if the corresponding transaction is done(COMMITTED,
FINISHED, CANCELLED)
+ public boolean isTxnDone() {
+ return state == JobState.COMMITTED || state == JobState.FINISHED ||
state == JobState.CANCELLED;
+ }
+
+ // return true if job is done(FINISHED/CANCELLED)
public boolean isCompleted() {
return state == JobState.FINISHED || state == JobState.CANCELLED;
}
@@ -352,7 +356,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
isJobTypeRead = jobTypeRead;
}
- public void beginTxn() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
+ public void beginTxn() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException, DuplicatedRequestException {
}
/**
@@ -362,8 +366,9 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
* @throws LabelAlreadyUsedException the job is duplicated
* @throws BeginTransactionException the limit of load job is exceeded
* @throws AnalysisException there are error params in job
+ * @throws DuplicatedRequestException
*/
- public void execute() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
+ public void execute() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException, DuplicatedRequestException {
writeLock();
try {
unprotectedExecute();
@@ -372,7 +377,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
}
}
- public void unprotectedExecute() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
+ public void unprotectedExecute()
+ throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
// check if job state is pending
if (state != JobState.PENDING) {
return;
@@ -384,16 +390,23 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
}
public void processTimeout() {
+ // this is only for jobs which transaction is not started.
+ // if transaction is started, global transaction manager will handle
the timeout.
writeLock();
try {
- if (isCompleted() || getDeadlineMs() >= System.currentTimeMillis()
|| isCommitting) {
+ if (state != JobState.PENDING) {
return;
}
- unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.TIMEOUT,
"loading timeout to cancel"), true);
+
+ if (!isTimeout()) {
+ return;
+ }
+
+ unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.TIMEOUT,
"loading timeout to cancel"), false);
+ logFinalOperation();
} finally {
writeUnlock();
}
- logFinalOperation();
}
protected void unprotectedExecuteJob() {
@@ -420,8 +433,12 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
case LOADING:
executeLoad();
break;
+ case COMMITTED:
+ executeCommitted();
+ break;
case FINISHED:
executeFinish();
+ break;
default:
break;
}
@@ -432,21 +449,30 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
state = JobState.LOADING;
}
- public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn) {
+ private void executeCommitted() {
+ state = JobState.COMMITTED;
+ }
+
+ // if needLog is false, no need to write edit log.
+ public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn,
boolean needLog) {
writeLock();
try {
unprotectedExecuteCancel(failMsg, abortTxn);
+ if (needLog) {
+ logFinalOperation();
+ }
} finally {
writeUnlock();
}
- logFinalOperation();
}
public void cancelJob(FailMsg failMsg) throws DdlException {
writeLock();
try {
- // check
- if (!isCancellable) {
+ checkAuth("CANCEL LOAD");
+
+ // mini load can not be cancelled by frontend
+ if (jobType == EtlJobType.MINI) {
throw new DdlException("Job could not be cancelled in type " +
jobType.name());
}
if (isCommitting) {
@@ -455,20 +481,19 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
+ "The job could not be cancelled in
this step").build());
throw new DdlException("Job could not be cancelled while txn
is committing");
}
- if (isCompleted()) {
+ if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("state", state)
- .add("error_msg", "Job could not be cancelled
when job is completed")
+ .add("error_msg", "Job could not be cancelled
when job is " + state)
.build());
throw new DdlException("Job could not be cancelled when job is
finished or cancelled");
}
- checkAuth("CANCEL LOAD");
unprotectedExecuteCancel(failMsg, true);
+ logFinalOperation();
} finally {
writeUnlock();
}
- logFinalOperation();
}
private void checkAuth(String command) throws DdlException {
@@ -526,7 +551,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
* This method will cancel job without edit log and lock
*
* @param failMsg
- * @param abortTxn true: abort txn when cancel job, false: only change the
state of job and ignore abort txn
+ * @param abortTxn
+ * true: abort txn when cancel job, false: only change the
state of job and ignore abort txn
*/
protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn)
{
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
@@ -551,8 +577,9 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
this.failMsg = failMsg;
finishTimestamp = System.currentTimeMillis();
- // remove callback
+ // remove callback before abortTransaction(), so that the
afterAborted() callback will not be called again
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
+
if (abortTxn) {
// abort txn
try {
@@ -563,9 +590,9 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
Catalog.getCurrentGlobalTransactionMgr().abortTransaction(transactionId,
failMsg.getMsg());
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("transaction_id", transactionId)
- .add("error_msg", "failed to abort txn when
job is cancelled, txn will be aborted later")
- .build());
+ .add("transaction_id", transactionId)
+ .add("error_msg", "failed to abort txn when job is
cancelled. " + e.getMessage())
+ .build());
}
}
@@ -725,8 +752,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
public void beforeCommitted(TransactionState txnState) throws
TransactionException {
writeLock();
try {
- if (isCompleted()) {
- throw new TransactionException("txn could not be committed
when job has been cancelled");
+ if (isTxnDone()) {
+ throw new TransactionException("txn could not be committed
because job is: " + state);
}
isCommitting = true;
} finally {
@@ -742,6 +769,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
writeLock();
try {
isCommitting = false;
+ state = JobState.COMMITTED;
} finally {
writeUnlock();
}
@@ -752,6 +780,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
writeLock();
try {
replayTxnAttachment(txnState);
+ transactionId = txnState.getTransactionId();
+ state = JobState.COMMITTED;
} finally {
writeUnlock();
}
@@ -774,7 +804,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
}
writeLock();
try {
- if (isCompleted()) {
+ if (isTxnDone()) {
return;
}
// record attachment in load job
@@ -799,6 +829,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL,
txnState.getReason());
finishTimestamp = txnState.getFinishTime();
state = JobState.CANCELLED;
+
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
} finally {
writeUnlock();
}
@@ -828,6 +859,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
progress = 100;
finishTimestamp = txnState.getFinishTime();
state = JobState.FINISHED;
+
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
} finally {
writeUnlock();
}
diff --git
a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
index eae513d..5f6fa22 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
@@ -19,6 +19,7 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
@@ -73,8 +74,16 @@ public class LoadJobScheduler extends MasterDaemon {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
.add("error_msg", "There are error properties
in job. Job will be cancelled")
.build(), e);
- loadJob.cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()), false);
+ loadJob.cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
+ false, true);
continue;
+ } catch (DuplicatedRequestException e) {
+ // should not happen in load job scheduler, there is no
request id.
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
+ .add("error_msg", "Failed to begin txn with duplicate
request. Job will be rescheduled later")
+ .build(), e);
+ needScheduleJobs.put(loadJob);
+ return;
} catch (BeginTransactionException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
.add("error_msg", "Failed to begin txn when
job is scheduling. "
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 3637952..2edfecf 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -40,6 +40,8 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadBeginRequest;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -97,7 +99,7 @@ public class LoadManager implements Writable{
LoadJob loadJob = null;
writeLock();
try {
- checkLabelUsed(dbId, stmt.getLabel().getLabelName(), null);
+ checkLabelUsed(dbId, stmt.getLabel().getLabelName());
if (stmt.getBrokerDesc() == null) {
throw new DdlException("LoadManager only support the broker
load.");
}
@@ -135,23 +137,22 @@ public class LoadManager implements Writable{
LoadJob loadJob = null;
writeLock();
try {
- checkLabelUsed(database.getId(), request.getLabel(),
request.getRequest_id());
loadJob = new MiniLoadJob(database.getId(), request);
- createLoadJob(loadJob);
+ // call unprotectedExecute before adding load job. so that if job
is not started ok, no need to add.
+ // NOTICE(cmy): this order is only for Mini Load, because mini
load's unprotectedExecute() only do beginTxn().
+ // for other kind of load job, execute the job after adding job.
// Mini load job must be executed before release write lock.
// Otherwise, the duplicated request maybe get the transaction id
before transaction of mini load is begun.
loadJob.unprotectedExecute();
+ createLoadJob(loadJob);
} catch (DuplicatedRequestException e) {
- LOG.info(new LogBuilder(LogKey.LOAD_JOB,
e.getDuplicatedRequestId())
- .add("msg", "the duplicated request returns the
txn id "
- + "which was created by the same mini
load")
- .build());
- return
dbIdToLabelToLoadJobs.get(database.getId()).get(request.getLabel())
- .stream().filter(entity -> entity.getState() !=
JobState.CANCELLED).findFirst()
- .get().getTransactionId();
+ // this is a duplicate request, just return previous txn id
+ LOG.info("deplicate request for mini load. request id: {}, txn:
{}", e.getDuplicatedRequestId(), e.getTxnId());
+ return e.getTxnId();
} catch (UserException e) {
if (loadJob != null) {
- loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL,
e.getMessage()), false);
+ loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL,
e.getMessage()), false,
+ false /* no need to write edit log, because
createLoadJob log is not wrote yet */);
}
throw e;
} finally {
@@ -185,7 +186,7 @@ public class LoadManager implements Writable{
Database database = checkDb(stmt.getLabel().getDbName());
writeLock();
try {
- checkLabelUsed(database.getId(), stmt.getLabel().getLabelName(),
null);
+ checkLabelUsed(database.getId(), stmt.getLabel().getLabelName());
Catalog.getCurrentCatalog().getLoadInstance().addLoadJob(stmt,
jobType, timestamp);
} finally {
writeUnlock();
@@ -211,7 +212,7 @@ public class LoadManager implements Writable{
Database database = checkDb(ClusterNamespace.getFullName(cluster,
request.getDb()));
writeLock();
try {
- checkLabelUsed(database.getId(), request.getLabel(), null);
+ checkLabelUsed(database.getId(), request.getLabel());
return
Catalog.getCurrentCatalog().getLoadInstance().addMiniLoadJob(request);
} finally {
writeUnlock();
@@ -222,7 +223,7 @@ public class LoadManager implements Writable{
Database database = checkDb(fullDbName);
writeLock();
try {
- checkLabelUsed(database.getId(), label, null);
+ checkLabelUsed(database.getId(), label);
Catalog.getCurrentCatalog().getLoadInstance()
.registerMiniLabel(fullDbName, label,
System.currentTimeMillis());
} finally {
@@ -237,6 +238,7 @@ public class LoadManager implements Writable{
.build());
}
+ // add load job and also add to to callback factory
private void createLoadJob(LoadJob loadJob) {
addLoadJob(loadJob);
// add callback before txn created, because callback will be performed
on replay without txn begin
@@ -296,7 +298,7 @@ public class LoadManager implements Writable{
if (loadJobList == null) {
throw new DdlException("Load job does not exist");
}
- Optional<LoadJob> loadJobOptional =
loadJobList.stream().filter(entity -> !entity.isCompleted()).findFirst();
+ Optional<LoadJob> loadJobOptional =
loadJobList.stream().filter(entity -> !entity.isTxnDone()).findFirst();
if (!loadJobOptional.isPresent()) {
throw new DdlException("There is no uncompleted job which
label is " + stmt.getLabel());
}
@@ -310,6 +312,15 @@ public class LoadManager implements Writable{
public void replayEndLoadJob(LoadJobFinalOperation operation) {
LoadJob job = idToLoadJob.get(operation.getId());
+ if (job == null) {
+ // This should not happen.
+ // Last time I found that when user submit a job with already used
label, an END_LOAD_JOB edit log
+ // will be wrote but the job is not added to 'idToLoadJob', so
this job here we got will be null.
+ // And this bug has been fixed.
+ // Just add a log here to observe.
+ LOG.warn("job does not exist when replaying end load job edit log:
{}", operation);
+ return;
+ }
job.unprotectReadEndOperation(operation);
LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId())
.add("operation", operation)
@@ -360,6 +371,7 @@ public class LoadManager implements Writable{
}
}
+ // only for those jobs which transaction is not started
public void processTimeoutJobs() {
idToLoadJob.values().stream().forEach(entity ->
entity.processTimeout());
}
@@ -515,7 +527,7 @@ public class LoadManager implements Writable{
* @param requestId: the uuid of each txn request from BE
* @throws LabelAlreadyUsedException throw exception when label has been
used by an unfinished job.
*/
- private void checkLabelUsed(long dbId, String label, TUniqueId requestId)
+ private void checkLabelUsed(long dbId, String label)
throws DdlException {
// if label has been used in old load jobs
Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label);
@@ -527,11 +539,6 @@ public class LoadManager implements Writable{
Optional<LoadJob> loadJobOptional =
labelLoadJobs.stream().filter(entity ->
entity.getState() != JobState.CANCELLED).findFirst();
if (loadJobOptional.isPresent()) {
- LoadJob loadJob = loadJobOptional.get();
- if (loadJob.getRequestId() != null && requestId != null &&
loadJob.getRequestId().equals(requestId)) {
- throw new
DuplicatedRequestException(String.valueOf(loadJob.getId()),
- "The request is duplicated with " +
loadJob.getId());
- }
LOG.warn("Failed to add load job when label {} has been
used.", label);
throw new LabelAlreadyUsedException(label);
}
@@ -583,6 +590,27 @@ public class LoadManager implements Writable{
}
}
+ // in previous implementation, there is a bug that when the job's
corresponding transaction is
+ // COMMITTED but not VISIBLE, the load job's state is LOADING, so that the
job may be CANCELLED
+ // by timeout checker, which is not right.
+ // So here we will check each LOADING load jobs' txn status, if it is
COMMITTED, change load job's
+ // state to COMMITTED.
+ // this method should be removed at next upgrading.
+ public void transferLoadingStateToCommitted(GlobalTransactionMgr txnMgr) {
+ for (LoadJob job : idToLoadJob.values()) {
+ if (job.getState() == JobState.LOADING) {
+ // unfortunately, transaction id in load job is also not
persisted, so we have to traverse
+ // all transactions to find it.
+ TransactionState txn =
txnMgr.getCommittedTransactionStateByCallbackId(job.getCallbackId());
+ if (txn != null) {
+ job.updateState(JobState.COMMITTED);
+ LOG.info("transfer load job {} state from LOADING to
COMMITTED, because txn {} is COMMITTED",
+ job.getId(), txn.getTransactionId());
+ }
+ }
+ }
+ }
+
@Override
public void write(DataOutput out) throws IOException {
List<LoadJob> loadJobs =
idToLoadJob.values().stream().filter(this::needSave).collect(Collectors.toList());
diff --git
a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
index ae9019d..2f75e1f 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java
@@ -24,9 +24,9 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
- * LoadTimeoutChecker is performed to cancel the timeout job.
- * The job which is not finished, not cancelled, not isCommitting will be
checked.
- * The standard of timeout is CurrentTS > (CreateTs + timeoutSeconds * 1000).
+ * LoadTimeoutChecker will try to cancel the timeout load jobs.
+ * And it will not handle the job which the corresponding transaction is
started.
+ * For those jobs, global transaction manager cancel the corresponding job
while aborting the timeout transaction.
*/
public class LoadTimeoutChecker extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(LoadTimeoutChecker.class);
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
index 6566209..9be8021 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
@@ -60,7 +61,6 @@ public class MiniLoadJob extends LoadJob {
if (request.isSetMax_filter_ratio()) {
this.maxFilterRatio = request.getMax_filter_ratio();
}
- this.isCancellable = false;
this.createTimestamp = request.getCreate_timestamp();
this.loadStartTimestamp = createTimestamp;
this.authorizationInfo = gatherAuthInfo();
@@ -86,9 +86,10 @@ public class MiniLoadJob extends LoadJob {
}
@Override
- public void beginTxn() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
+ public void beginTxn()
+ throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
- .beginTransaction(dbId, label, null, "FE: " +
FrontendOptions.getLocalHostAddress(),
+ .beginTransaction(dbId, label, requestId, "FE: " +
FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
timeoutSecond);
}
diff --git
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 8ebeb84..b7fb7a5 100644
---
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
@@ -148,6 +149,10 @@ public abstract class RoutineLoadTaskInfo {
routineLoadJob.getDbId(), DebugUtil.printId(id), null,
"FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK,
routineLoadJob.getId(),
timeoutMs / 1000);
+ } catch (DuplicatedRequestException e) {
+ // should not happen, because we didn't pass request id in when
begin transaction
+ LOG.warn("failed to begin txn for routine load task: {}, {}",
DebugUtil.printId(id), e.getMessage());
+ return false;
} catch (LabelAlreadyUsedException e) {
// this should not happen for a routine load task, throw it out
throw e;
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 81f5743..8a29838 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -564,7 +564,6 @@ public class EditLog {
final TransactionState state = (TransactionState)
journal.getData();
Catalog.getCurrentGlobalTransactionMgr().replayUpsertTransactionState(state);
LOG.debug("opcode: {}, tid: {}", opCode,
state.getTransactionId());
-
break;
}
case OperationType.OP_DELETE_TRANSACTION_STATE: {
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 0b64d80..d7450be 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.AuditLog;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.ThriftServerContext;
@@ -609,6 +610,10 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setStatus(status);
try {
result.setTxnId(loadTxnBeginImpl(request, clientAddr));
+ } catch (DuplicatedRequestException e) {
+ // this is a duplicate request, just return previous txn id
+ LOG.info("deplicate request for stream load. request id: {}, txn:
{}", e.getDuplicatedRequestId(), e.getTxnId());
+ result.setTxnId(e.getTxnId());
} catch (LabelAlreadyUsedException e) {
status.setStatus_code(TStatusCode.LABEL_ALREADY_EXISTS);
status.addToError_msgs(e.getMessage());
diff --git
a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 15d3e19..3b537a2 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -30,11 +30,14 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.metric.MetricRepo;
@@ -62,6 +65,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -77,7 +81,7 @@ import java.util.stream.Collectors;
* 3. abort
* Attention: all api in txn manager should get db lock or load lock first,
then get txn manager's lock, or there will be dead lock
*/
-public class GlobalTransactionMgr {
+public class GlobalTransactionMgr implements Writable {
private static final Logger LOG =
LogManager.getLogger(GlobalTransactionMgr.class);
// the lock is used to control the access to transaction states
@@ -87,8 +91,12 @@ public class GlobalTransactionMgr {
// transactionId -> TransactionState
private Map<Long, TransactionState> idToTransactionState =
Maps.newConcurrentMap();
- // db id -> (label -> txn id)
- private com.google.common.collect.Table<Long, String, Long>
dbIdToTxnLabels = HashBasedTable.create();
+ // db id -> (label -> txn ids)
+ // this is used for checking if label already used. a label may correspond
to multiple txns,
+ // and only one is success.
+ // this member should be consistent with idToTransactionState, which means
if a txn exist in idToTransactionState,
+ // it must exists in dbIdToTxnLabels, and vice versa
+ private com.google.common.collect.Table<Long, String, Set<Long>>
dbIdToTxnLabels = HashBasedTable.create();
// count the number of running txns of each database, except for the
routine load txn
private Map<Long, Integer> runningTxnNums = Maps.newHashMap();
// count only the number of running routine load txns of each database
@@ -109,7 +117,8 @@ public class GlobalTransactionMgr {
}
public long beginTransaction(long dbId, String label, String coordinator,
LoadJobSourceType sourceType,
- long timeoutSecond) throws AnalysisException,
LabelAlreadyUsedException, BeginTransactionException {
+ long timeoutSecond)
+ throws AnalysisException, LabelAlreadyUsedException,
BeginTransactionException, DuplicatedRequestException {
return beginTransaction(dbId, label, null, coordinator, sourceType,
-1, timeoutSecond);
}
@@ -123,11 +132,12 @@ public class GlobalTransactionMgr {
*
* @param coordinator
* @throws BeginTransactionException
+ * @throws DuplicatedRequestException
* @throws IllegalTransactionParameterException
*/
public long beginTransaction(long dbId, String label, TUniqueId requestId,
String coordinator, LoadJobSourceType sourceType, long listenerId,
long timeoutSecond)
- throws AnalysisException, LabelAlreadyUsedException,
BeginTransactionException {
+ throws AnalysisException, LabelAlreadyUsedException,
BeginTransactionException, DuplicatedRequestException {
if (Config.disable_load_job) {
throw new AnalysisException("disable_load_job is set to true, all
load jobs are prevented");
@@ -145,20 +155,38 @@ public class GlobalTransactionMgr {
Preconditions.checkNotNull(coordinator);
Preconditions.checkNotNull(label);
FeNameFormat.checkLabel(label);
- Map<String, Long> txnLabels = dbIdToTxnLabels.row(dbId);
- if (txnLabels != null && txnLabels.containsKey(label)) {
- TransactionState existTxn =
getTransactionState(txnLabels.get(label));
- // check timestamp
- if (requestId != null) {
- if (existTxn != null && existTxn.getTransactionStatus() ==
TransactionStatus.PREPARE
- && existTxn.getRequsetId() != null &&
existTxn.getRequsetId().equals(requestId)) {
+
+ /*
+ * Check if label already used, by following steps
+ * 1. get all existing transactions
+ * 2. if there is a PREPARE transaction, check if this is a retry
request. If yes, return the
+ * existing txn id.
+ * 3. if there is a non-aborted transaction, throw label already
used exception.
+ */
+ Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label);
+ if (existingTxnIds != null && !existingTxnIds.isEmpty()) {
+ List<TransactionState> notAbortedTxns = Lists.newArrayList();
+ for (long txnId : existingTxnIds) {
+ TransactionState txn = idToTransactionState.get(txnId);
+ Preconditions.checkNotNull(txn);
+ if (txn.getTransactionStatus() !=
TransactionStatus.ABORTED) {
+ notAbortedTxns.add(txn);
+ }
+ }
+ // there should be at most 1 txn in PREPARE/COMMITTED/VISIBLE
status
+ Preconditions.checkState(notAbortedTxns.size() <= 1,
notAbortedTxns);
+ if (!notAbortedTxns.isEmpty()) {
+ TransactionState notAbortedTxn = notAbortedTxns.get(0);
+ if (requestId != null &&
notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE
+ && notAbortedTxn.getRequsetId() != null &&
notAbortedTxn.getRequsetId().equals(requestId)) {
// this may be a retry request for same job, just
return existing txn id.
- return txnLabels.get(label);
+ throw new
DuplicatedRequestException(DebugUtil.printId(requestId),
+ notAbortedTxn.getTransactionId(), "");
}
+ throw new LabelAlreadyUsedException(label,
notAbortedTxn.getTransactionStatus());
}
- throw new LabelAlreadyUsedException(label,
existTxn.getTransactionStatus());
}
-
+
checkRunningTxnExceedLimit(dbId, sourceType);
long tid = idGenerator.getNextTransactionId();
@@ -173,6 +201,8 @@ public class GlobalTransactionMgr {
}
return tid;
+ } catch (DuplicatedRequestException e) {
+ throw e;
} catch (Exception e) {
if (MetricRepo.isInit.get()) {
MetricRepo.COUNTER_TXN_REJECT.increase(1L);
@@ -203,15 +233,13 @@ public class GlobalTransactionMgr {
public TransactionStatus getLabelState(long dbId, String label) {
readLock();
try {
- Map<String, Long> txnLabels = dbIdToTxnLabels.row(dbId);
- if (txnLabels == null) {
+ Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label);
+ if (existingTxnIds == null || existingTxnIds.isEmpty()) {
return TransactionStatus.UNKNOWN;
}
- Long transactionId = txnLabels.get(label);
- if (transactionId == null) {
- return TransactionStatus.UNKNOWN;
- }
- return
idToTransactionState.get(transactionId).getTransactionStatus();
+ // find the latest txn (which id is largest)
+ long maxTxnId =
existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).get();
+ return idToTransactionState.get(maxTxnId).getTransactionStatus();
} finally {
readUnlock();
}
@@ -224,8 +252,8 @@ public class GlobalTransactionMgr {
if (state == null) {
return;
}
- editLog.logDeleteTransactionState(state);
replayDeleteTransactionState(state);
+ editLog.logDeleteTransactionState(state);
} finally {
writeUnlock();
}
@@ -493,18 +521,29 @@ public class GlobalTransactionMgr {
public void abortTransaction(Long dbId, String label, String reason)
throws UserException {
Preconditions.checkNotNull(label);
Long transactionId = null;
- writeLock();
+ readLock();
try {
- Map<String, Long> dbTxns = dbIdToTxnLabels.row(dbId);
- if (dbTxns == null) {
+ Set<Long> existingTxns = dbIdToTxnLabels.get(dbId, label);
+ if (existingTxns == null || existingTxns.isEmpty()) {
throw new UserException("transaction not found, label=" +
label);
}
- transactionId = dbTxns.get(label);
- if (transactionId == null) {
+ // find PREPARE txn. For one load label, there should be only one
PREPARE txn.
+ TransactionState prepareTxn = null;
+ for (Long txnId : existingTxns) {
+ TransactionState txn = idToTransactionState.get(txnId);
+ if (txn.getTransactionStatus() == TransactionStatus.PREPARE) {
+ prepareTxn = txn;
+ break;
+ }
+ }
+
+ if (prepareTxn == null) {
throw new UserException("transaction not found, label=" +
label);
}
+
+ transactionId = prepareTxn.getTransactionId();
} finally {
- writeUnlock();
+ readUnlock();
}
abortTransaction(transactionId, reason);
}
@@ -837,7 +876,7 @@ public class GlobalTransactionMgr {
List<Long> timeoutTxns = Lists.newArrayList();
List<Long> expiredTxns = Lists.newArrayList();
- writeLock();
+ readLock();
try {
for (TransactionState transactionState :
idToTransactionState.values()) {
if (transactionState.isExpired(currentMillis)) {
@@ -849,7 +888,7 @@ public class GlobalTransactionMgr {
}
}
} finally {
- writeUnlock();
+ readUnlock();
}
// delete expired txns
@@ -959,7 +998,8 @@ public class GlobalTransactionMgr {
}
if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- throw new UserException("transaction's state is already committed
or visible, could not abort");
+ throw new UserException("transaction's state is already "
+ + transactionState.getTransactionStatus() + ", could not
abort");
}
transactionState.setFinishTime(System.currentTimeMillis());
transactionState.setReason(reason);
@@ -980,10 +1020,10 @@ public class GlobalTransactionMgr {
transactionState.replaySetTransactionStatus();
Database db = catalog.getDb(transactionState.getDbId());
if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
- LOG.debug("replay a committed transaction {}",
transactionState);
+ LOG.info("replay a committed transaction {}",
transactionState);
updateCatalogAfterCommitted(transactionState, db);
} else if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- LOG.debug("replay a visible transaction {}", transactionState);
+ LOG.info("replay a visible transaction {}", transactionState);
updateCatalogAfterVisible(transactionState, db);
}
TransactionState preTxnState =
idToTransactionState.get(transactionState.getTransactionId());
@@ -1000,7 +1040,11 @@ public class GlobalTransactionMgr {
writeLock();
try {
idToTransactionState.remove(transactionState.getTransactionId());
- dbIdToTxnLabels.remove(transactionState.getDbId(),
transactionState.getLabel());
+ Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(),
transactionState.getLabel());
+ txnIds.remove(transactionState.getTransactionId());
+ if (txnIds.isEmpty()) {
+ dbIdToTxnLabels.remove(transactionState.getDbId(),
transactionState.getLabel());
+ }
} finally {
writeUnlock();
}
@@ -1112,13 +1156,12 @@ public class GlobalTransactionMgr {
}
private void updateTxnLabels(TransactionState transactionState) {
- // if the transaction is aborted, then its label could be reused
- if (transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
- dbIdToTxnLabels.remove(transactionState.getDbId(),
transactionState.getLabel());
- } else {
- dbIdToTxnLabels.put(transactionState.getDbId(),
transactionState.getLabel(),
- transactionState.getTransactionId());
+ Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(),
transactionState.getLabel());
+ if (txnIds == null) {
+ txnIds = Sets.newHashSet();
+ dbIdToTxnLabels.put(transactionState.getDbId(),
transactionState.getLabel(), txnIds);
}
+ txnIds.add(transactionState.getTransactionId());
}
private void updateDBRunningTxnNum(TransactionStatus preStatus,
TransactionState curTxnState) {
@@ -1278,7 +1321,7 @@ public class GlobalTransactionMgr {
return this.idGenerator;
}
- // this two function used to read snapshot or write snapshot
+ @Override
public void write(DataOutput out) throws IOException {
int numTransactions = idToTransactionState.size();
out.writeInt(numTransactions);
@@ -1288,6 +1331,7 @@ public class GlobalTransactionMgr {
idGenerator.write(out);
}
+ @Override
public void readFields(DataInput in) throws IOException {
int numTransactions = in.readInt();
for (int i = 0; i < numTransactions; ++i) {
@@ -1301,4 +1345,18 @@ public class GlobalTransactionMgr {
}
idGenerator.readFields(in);
}
+
+ public TransactionState getCommittedTransactionStateByCallbackId(long
callbackId) {
+ readLock();
+ try {
+ for (TransactionState txn : idToTransactionState.values()) {
+ if (txn.getCallbackId() == callbackId &&
txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
+ return txn;
+ }
+ }
+ } finally {
+ readUnlock();
+ }
+ return null;
+ }
}
diff --git
a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
index ef7fcbd..0324fda 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -336,7 +336,8 @@ public class TransactionState implements Writable {
switch (transactionStatus) {
case COMMITTED:
// Maybe listener has been deleted. The txn need to be
aborted later.
- throw new TransactionException("Failed to commit txn when
callback could not be found");
+ throw new TransactionException(
+ "Failed to commit txn when callback " + callbackId
+ "could not be found");
default:
break;
}
@@ -469,6 +470,7 @@ public class TransactionState implements Writable {
sb.append("transaction id: ").append(transactionId);
sb.append(", label: ").append(label);
sb.append(", db id: ").append(dbId);
+ sb.append(", callback id: ").append(callbackId);
sb.append(", coordinator: ").append(coordinator);
sb.append(", transaction status: ").append(transactionStatus);
sb.append(", error replicas num: ").append(errorReplicas.size());
diff --git
a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
index 25430c6..28accde 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
@@ -40,8 +40,9 @@ public class TxnStateCallbackFactory {
}
public synchronized void removeCallback(long id) {
- callbacks.remove(id);
- LOG.info("remove callback of txn state : {}", id);
+ if (callbacks.remove(id) != null) {
+ LOG.info("remove callback of txn state : {}", id);
+ }
}
public synchronized TxnStateChangeCallback getCallback(long id) {
diff --git a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
index 1ec3944..392005f 100644
--- a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
+++ b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
@@ -106,7 +107,11 @@ abstract public class DorisHttpTestCase {
public static long testPartitionCurrentVersionHash = 12312;
public static long testPartitionNextVersionHash = 123123123;
- public static final int HTTP_PORT = 32474;
+ public static final int HTTP_PORT;
+ static {
+ Random r = new Random(System.currentTimeMillis());
+ HTTP_PORT = 30000 + r.nextInt(10000);
+ }
protected static final String URI = "http://localhost:" + HTTP_PORT +
"/api/" + DB_NAME + "/" + TABLE_NAME;
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index 0f32c13..df1f8b8 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.metric.LongCounterMetric;
@@ -100,7 +101,7 @@ public class LoadJobTest {
@Test
public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr,
@Mocked MasterTaskExecutor masterTaskExecutor)
- throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException {
+ throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
LoadJob loadJob = new BrokerLoadJob();
new Expectations() {
{
diff --git
a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 943b6b2..91b1682 100644
---
a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
@@ -103,7 +104,7 @@ public class GlobalTransactionMgrTest {
@Test
public void testBeginTransaction() throws LabelAlreadyUsedException,
AnalysisException,
- BeginTransactionException {
+ BeginTransactionException, DuplicatedRequestException {
FakeCatalog.setCatalog(masterCatalog);
long transactionId =
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
CatalogTestUtil.testTxnLable1,
@@ -119,7 +120,7 @@ public class GlobalTransactionMgrTest {
@Test
public void testBeginTransactionWithSameLabel() throws
LabelAlreadyUsedException, AnalysisException,
- BeginTransactionException {
+ BeginTransactionException, DuplicatedRequestException {
FakeCatalog.setCatalog(masterCatalog);
long transactionId = 0;
try {
@@ -599,7 +600,7 @@ public class GlobalTransactionMgrTest {
@Test
public void testDeleteTransaction() throws LabelAlreadyUsedException,
- AnalysisException, BeginTransactionException {
+ AnalysisException, BeginTransactionException,
DuplicatedRequestException {
long transactionId =
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
CatalogTestUtil.testTxnLable1,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]