This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7cfce63 [fix](mini-load) Remove mini load in LOADING and PENDING
state (#8649)
7cfce63 is described below
commit 7cfce63a133b0d755dac109c8895f9c6566806cb
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Mar 28 10:22:17 2022 +0800
[fix](mini-load) Remove mini load in LOADING and PENDING state (#8649)
1. Remove some unused code.
2. handle mini load with wrong state
1. For some historical reasons, some mini load jobs in LOADING state
have not been cleared.
As a result, new load jobs cannot be committed.
2. If a mini load job is created right before FE restart, the mini load
job will be in PENDING state forever.
But it should be removed finally.
---
.../org/apache/doris/load/loadv2/LoadManager.java | 146 +++------------------
1 file changed, 21 insertions(+), 125 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index ae0f97c..64e0973 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -36,7 +36,6 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
-import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
@@ -45,9 +44,7 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadBeginRequest;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TUniqueId;
-import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -359,32 +356,6 @@ public class LoadManager implements Writable {
}
}
- public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
- Database db =
Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
-
- LoadJob loadJob = null;
- readLock();
- try {
- Map<String, List<LoadJob>> labelToLoadJobs =
dbIdToLabelToLoadJobs.get(db.getId());
- if (labelToLoadJobs == null) {
- throw new DdlException("Load job does not exist");
- }
- List<LoadJob> loadJobList = labelToLoadJobs.get(stmt.getLabel());
- if (loadJobList == null) {
- throw new DdlException("Load job does not exist");
- }
- Optional<LoadJob> loadJobOptional =
loadJobList.stream().filter(entity -> !entity.isTxnDone()).findFirst();
- if (!loadJobOptional.isPresent()) {
- throw new DdlException("There is no uncompleted job which
label is " + stmt.getLabel());
- }
- loadJob = loadJobOptional.get();
- } finally {
- readUnlock();
- }
-
- loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user
cancel"));
- }
-
public void replayEndLoadJob(LoadJobFinalOperation operation) {
LoadJob job = idToLoadJob.get(operation.getId());
if (job == null) {
@@ -683,102 +654,6 @@ public class LoadManager implements Writable {
}
}
- @Deprecated
- // Deprecated in version 0.12
- // This method is only for bug fix. And should be call after image and
edit log are replayed.
- public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) {
- for (LoadJob job : idToLoadJob.values()) {
- /*
- * Bug 1:
- * in previous implementation, there is a bug that when the job's
corresponding transaction is
- * COMMITTED but not VISIBLE, the load job's state is LOADING, so
that the job may be CANCELLED
- * by timeout checker, which is not right.
- * So here we will check each LOADING load jobs' txn status, if it
is COMMITTED, change load job's
- * state to COMMITTED.
- * this method should be removed at next upgrading.
- * only mini load job will be in LOADING state when persist,
because mini load job is executed before writing
- * edit log.
- */
- if (job.getState() == JobState.LOADING) {
- // unfortunately, transaction id in load job is also not
persisted, so we have to traverse
- // all transactions to find it.
- TransactionState txn =
txnMgr.getTransactionStateByCallbackIdAndStatus(job.getDbId(),
job.getCallbackId(),
- Sets.newHashSet(TransactionStatus.COMMITTED));
- if (txn != null) {
- job.updateState(JobState.COMMITTED);
- LOG.info("transfer load job {} state from LOADING to
COMMITTED, because txn {} is COMMITTED."
- + " label: {}, db: {}", job.getId(),
txn.getTransactionId(), job.getLabel(), job.getDbId());
- continue;
- }
- }
-
- /*
- * Bug 2:
- * There is bug in Doris version 0.10.15. When a load job in
PENDING or LOADING
- * state was replayed from image (not through the edit log), we
forgot to add
- * the corresponding callback id in the CallbackFactory. As a
result, the
- * subsequent finish txn edit logs cannot properly finish the job
during the
- * replay process. This results in that when the FE restarts,
these load jobs
- * that should have been completed are re-entered into the pending
state,
- * resulting in repeated submission load tasks.
- *
- * Those wrong images are unrecoverable, so that we have to cancel
all load jobs
- * in PENDING or LOADING state when restarting FE, to avoid submit
jobs
- * repeatedly.
- *
- * This code can be remove when upgrading from 0.11.x to future
version.
- */
- if (job.getState() == JobState.LOADING || job.getState() ==
JobState.PENDING) {
- JobState prevState = job.getState();
- TransactionState txn =
txnMgr.getTransactionStateByCallbackId(job.getDbId(), job.getCallbackId());
- if (txn != null) {
- // the txn has already been committed or visible, change
job's state to committed or finished
- if (txn.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
- job.updateState(JobState.COMMITTED);
- LOG.info("transfer load job {} state from {} to
COMMITTED, because txn {} is COMMITTED",
- job.getId(), prevState,
txn.getTransactionId());
- } else if (txn.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- job.updateState(JobState.FINISHED);
- LOG.info("transfer load job {} state from {} to
FINISHED, because txn {} is VISIBLE",
- job.getId(), prevState,
txn.getTransactionId());
- } else if (txn.getTransactionStatus() ==
TransactionStatus.ABORTED) {
- job.cancelJobWithoutCheck(new
FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false);
- LOG.info("transfer load job {} state from {} to
CANCELLED, because txn {} is ABORTED",
- job.getId(), prevState,
txn.getTransactionId());
- } else {
- // pending txn, do nothing
- }
- continue;
- }
-
- if (job.getJobType() == EtlJobType.MINI) {
- // for mini load job, just set it as CANCELLED, because
mini load is a synchronous load.
- // it would be failed if FE restart.
- job.cancelJobWithoutCheck(new
FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false);
- LOG.info("transfer mini load job {} state from {} to
CANCELLED, because transaction status is unknown"
- + ". label: {}, db: {}",
- job.getId(), prevState, job.getLabel(),
job.getDbId());
- } else {
- // txn is not found. here are 2 cases:
- // 1. txn is not start yet, so we can just set job to
CANCELLED, and user need to submit the job again.
- // 2. because of the bug, txn is ABORTED of VISIBLE, and
job is not finished. and txn is expired and
- // be removed from transaction manager. So we don't
know this job is finished or cancelled.
- // in this case, the job should has been submitted long
ago (otherwise the txn could not have been
- // removed by expiration).
- // Without affecting the first case of job, we set the job
finish time to be the same as the create time.
- // In this way, the second type of job will be
automatically cleared after running removeOldLoadJob();
-
- // use CancelType.UNKNOWN, so that we can set finish time
to be the same as the create time
- job.cancelJobWithoutCheck(new
FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false);
- LOG.info("finish load job {} from {} to CANCELLED, because
transaction status is unknown. label: {}, db: {}, create: {}",
- job.getId(), prevState, job.getLabel(),
job.getDbId(), TimeUtils.longToTimeString(job.getCreateTimestamp()));
- }
- }
- }
-
- removeOldLoadJob();
- }
-
@Override
public void write(DataOutput out) throws IOException {
long currentTimeMs = System.currentTimeMillis();
@@ -798,6 +673,27 @@ public class LoadManager implements Writable {
if (loadJob.isExpired(currentTimeMs)) {
continue;
}
+
+ if (loadJob.getJobType() == EtlJobType.MINI) {
+ // This is a bug fix. the mini load job should not with state
LOADING.
+ if (loadJob.getState() == JobState.LOADING) {
+ LOG.warn("skip mini load job {} in db {} with LOADING
state", loadJob.getId(), loadJob.getDbId());
+ continue;
+ }
+
+ if (loadJob.getState() == JobState.PENDING) {
+ // bad case. When a mini load job is created and then FE
restart.
+ // the job will be in PENDING state forever.
+ // This is a temp solution to remove these jobs. And the
mini load job should be deprecated in Doris v1.1
+ TransactionState state =
Catalog.getCurrentCatalog().getGlobalTransactionMgr().getTransactionState(
+ loadJob.getDbId(), loadJob.getTransactionId());
+ if (state == null) {
+ LOG.warn("skip mini load job {} in db {} with PENDING
state and with txn: {}",
+ loadJob.getId(), loadJob.getDbId(),
loadJob.getTransactionId());
+ continue;
+ }
+ }
+ }
idToLoadJob.put(loadJob.getId(), loadJob);
Map<String, List<LoadJob>> map =
dbIdToLabelToLoadJobs.get(loadJob.getDbId());
if (map == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]