This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 22bafef [fix](broker-load) fix bug that a cancelled job's state is
LOADING (#8363)
22bafef is described below
commit 22bafef875edacaad6aec7a1b13f0380972c65ed
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Mar 8 18:53:45 2022 +0800
[fix](broker-load) fix bug that a cancelled job's state is LOADING (#8363)
1.
Before executing LoadLoadingTask of a broker load, we should check if the
job is cancelled.
2.
Add a new column `runningTransactionNum` for `show proc "/transactions"`.
So that we can view all running txns in each db in one command.
---
.../java/org/apache/doris/catalog/Catalog.java | 15 ++++++----
.../apache/doris/common/proc/TransDbProcDir.java | 1 +
.../main/java/org/apache/doris/load/LoadJob.java | 7 +++--
.../org/apache/doris/load/loadv2/JobState.java | 6 +++-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 34 +++++++++++++++-------
.../apache/doris/load/loadv2/LoadLoadingTask.java | 7 +++--
.../doris/transaction/GlobalTransactionMgr.java | 7 +++++
7 files changed, 54 insertions(+), 23 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index af17ec5..19552ad 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -161,6 +161,7 @@ import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteHandler;
+import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.ExportChecker;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportMgr;
@@ -258,11 +259,15 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
import java.io.BufferedReader;
import java.io.DataInputStream;
@@ -292,12 +297,6 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import com.sleepycat.je.rep.InsufficientLogException;
-import com.sleepycat.je.rep.NetworkRestore;
-import com.sleepycat.je.rep.NetworkRestoreConfig;
-
-import org.codehaus.jackson.map.ObjectMapper;
-
public class Catalog {
private static final Logger LOG = LogManager.getLogger(Catalog.class);
// 0 ~ 9999 used for qe
@@ -1748,6 +1747,10 @@ public class Catalog {
// LABEL_KEEP_MAX_MS
// This job must be FINISHED or CANCELLED
if (!job.isExpired(currentTimeMs)) {
+ if (job.getEtlJobType() != EtlJobType.HADOOP) {
+ LOG.warn("job {} with type is deprecated, skip it",
job.getId(), job.getEtlJobType());
+ continue;
+ }
load.unprotectAddLoadJob(job, true /* replay */);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java
index 634490b..0b92aa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java
@@ -33,6 +33,7 @@ public class TransDbProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
.add("DbId")
.add("DbName")
+ .add("RunningTransactionNum")
.build();
public TransDbProcDir() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
index faae8e6..680ff97 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
@@ -38,14 +38,14 @@ import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TResourceInfo;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -843,6 +843,7 @@ public class LoadJob implements Writable {
Text.writeString(out, tableName);
}
}
+
public void readFields(DataInput in) throws IOException {
long version = Catalog.getCurrentCatalogJournalVersion();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
index f25e05b..e023b68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
@@ -25,5 +25,9 @@ public enum JobState {
LOADING, // job is running
COMMITTED, // transaction is committed but not visible
FINISHED, // transaction is visible and job is finished
- CANCELLED // transaction is aborted and job is cancelled
+ CANCELLED; // transaction is aborted and job is cancelled
+
+ public boolean isFinalState() {
+ return this == FINISHED || this == CANCELLED;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 3f0f781..28f64f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -59,9 +59,6 @@ import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Joiner;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
@@ -72,6 +69,9 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -475,31 +475,43 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
*
* @param jobState
*/
- public void updateState(JobState jobState) {
+ public boolean updateState(JobState jobState) {
writeLock();
try {
- unprotectedUpdateState(jobState);
+ return unprotectedUpdateState(jobState);
} finally {
writeUnlock();
}
}
- protected void unprotectedUpdateState(JobState jobState) {
+ protected boolean unprotectedUpdateState(JobState jobState) {
+ if (this.state.isFinalState()) {
+ // This is a simple self-protection mechanism to prevent jobs
+ // that have entered the final state from being placed in a
non-terminating state again.
+ // For example, when a LoadLoadingTask starts running, it tries to
set the job state to LOADING,
+ // but the job may have been cancelled (CANCELLED) due to a
timeout.
+ // At this point, the job state should not be set to LOADING again.
+ // It is safe to return directly here without any processing,
+ // and other processes will ensure that the job ends properly.
+ LOG.warn("the load job {} is in final state: {}, should not update
state to {} again",
+ id, this.state, jobState);
+ return false;
+ }
switch (jobState) {
case UNKNOWN:
executeUnknown();
- break;
+ return true;
case LOADING:
executeLoad();
- break;
+ return true;
case COMMITTED:
executeCommitted();
- break;
+ return true;
case FINISHED:
executeFinish();
- break;
+ return true;
default:
- break;
+ return false;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 5733133..303f080 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -117,7 +117,10 @@ public class LoadLoadingTask extends LoadTask {
DebugUtil.printId(loadId), callback.getCallbackId(),
db.getFullName(), table.getName(), retryTime);
retryTime--;
beginTime = System.nanoTime();
- ((BrokerLoadJob) callback).updateState(JobState.LOADING);
+ if (!((BrokerLoadJob) callback).updateState(JobState.LOADING)) {
+ // job may already be cancelled
+ return;
+ }
executeOnce();
}
@@ -178,7 +181,7 @@ public class LoadLoadingTask extends LoadTask {
}
private long getLeftTimeMs() {
- return jobDeadlineMs - System.currentTimeMillis();
+ return Math.max(jobDeadlineMs - System.currentTimeMillis(), 1000L);
}
private void createProfile(Coordinator coord) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 24d20d7..a4fac83 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -438,6 +438,13 @@ public class GlobalTransactionMgr implements Writable {
continue;
}
info.add(db.getFullName());
+ long runningNum = 0;
+ try {
+ DatabaseTransactionMgr dbMgr = getDatabaseTransactionMgr(dbId);
+ runningNum = dbMgr.getRunningTxnNums();
+ } catch (AnalysisException e) {
+ }
+ info.add(runningNum);
infos.add(info);
}
return infos;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]