This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.0 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 178a943afb0c18ed1f4df6ba0dc6fccc3814660f Author: Mingyu Chen <[email protected]> AuthorDate: Tue Mar 8 18:53:45 2022 +0800 [fix](broker-load) fix bug that a cancelled job's state is LOADING (#8363) 1. Before executing LoadLoadingTask of a broker load, we should check if the job is cancelled. 2. Add a new column `runningTransactionNum` for `show proc "/transactions"`. So that we can view all running txns in each db in one command. --- .../java/org/apache/doris/catalog/Catalog.java | 15 ++++++---- .../apache/doris/common/proc/TransDbProcDir.java | 1 + .../main/java/org/apache/doris/load/LoadJob.java | 7 +++-- .../org/apache/doris/load/loadv2/JobState.java | 6 +++- .../java/org/apache/doris/load/loadv2/LoadJob.java | 34 +++++++++++++++------- .../apache/doris/load/loadv2/LoadLoadingTask.java | 7 +++-- .../doris/transaction/GlobalTransactionMgr.java | 7 +++++ 7 files changed, 54 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 933ba28..45598ab 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -161,6 +161,7 @@ import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.journal.bdbje.Timestamp; import org.apache.doris.load.DeleteHandler; +import org.apache.doris.load.EtlJobType; import org.apache.doris.load.ExportChecker; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; @@ -258,11 +259,15 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Queues; import com.google.common.collect.Sets; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; import java.io.BufferedReader; import java.io.DataInputStream; @@ -292,12 +297,6 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; -import com.sleepycat.je.rep.InsufficientLogException; -import com.sleepycat.je.rep.NetworkRestore; -import com.sleepycat.je.rep.NetworkRestoreConfig; - -import org.codehaus.jackson.map.ObjectMapper; - public class Catalog { private static final Logger LOG = LogManager.getLogger(Catalog.class); // 0 ~ 9999 used for qe @@ -1762,6 +1761,10 @@ public class Catalog { // LABEL_KEEP_MAX_MS // This job must be FINISHED or CANCELLED if (!job.isExpired(currentTimeMs)) { + if (job.getEtlJobType() != EtlJobType.HADOOP) { + LOG.warn("job {} with type is deprecated, skip it", job.getId(), job.getEtlJobType()); + continue; + } load.unprotectAddLoadJob(job, true /* replay */); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java index 634490b..0b92aa4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java @@ -33,6 +33,7 @@ public class TransDbProcDir implements ProcDirInterface { public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() .add("DbId") .add("DbName") + .add("RunningTransactionNum") .build(); public TransDbProcDir() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java index 9ba1aeb..c561895 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java @@ -39,14 +39,14 @@ import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TPriority; import org.apache.doris.thrift.TResourceInfo; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -844,6 +844,7 @@ public class LoadJob implements Writable { Text.writeString(out, tableName); } } + public void readFields(DataInput in) throws IOException { long version = Catalog.getCurrentCatalogJournalVersion(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java index f25e05b..e023b68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java @@ -25,5 +25,9 @@ public enum JobState { LOADING, // job is running COMMITTED, // transaction is committed but not visible FINISHED, // transaction is visible and job is finished - CANCELLED // transaction is aborted and job is cancelled + CANCELLED; // transaction is aborted and job is cancelled + + public boolean isFinalState() { + return this == FINISHED || this == CANCELLED; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 0fe8ef0..d979eac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -60,9 +60,6 @@ import org.apache.doris.transaction.ErrorTabletInfo; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Joiner; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; @@ -73,6 +70,9 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -475,31 +475,43 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements * * @param jobState */ - public void updateState(JobState jobState) { + public boolean updateState(JobState jobState) { writeLock(); try { - unprotectedUpdateState(jobState); + return unprotectedUpdateState(jobState); } finally { writeUnlock(); } } - protected void unprotectedUpdateState(JobState jobState) { + protected boolean unprotectedUpdateState(JobState jobState) { + if (this.state.isFinalState()) { + // This is a simple self-protection mechanism to prevent jobs + // that have entered the final state from being placed in a non-terminating state again. + // For example, when a LoadLoadingTask starts running, it tries to set the job state to LOADING, + // but the job may have been cancelled (CANCELLED) due to a timeout. + // At this point, the job state should not be set to LOADING again. + // It is safe to return directly here without any processing, + // and other processes will ensure that the job ends properly. + LOG.warn("the load job {} is in final state: {}, should not update state to {} again", + id, this.state, jobState); + return false; + } switch (jobState) { case UNKNOWN: executeUnknown(); - break; + return true; case LOADING: executeLoad(); - break; + return true; case COMMITTED: executeCommitted(); - break; + return true; case FINISHED: executeFinish(); - break; + return true; default: - break; + return false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 6ff10fe..5f05a6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -115,7 +115,10 @@ public class LoadLoadingTask extends LoadTask { DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime); retryTime--; beginTime = System.nanoTime(); - ((BrokerLoadJob) callback).updateState(JobState.LOADING); + if (!((BrokerLoadJob) callback).updateState(JobState.LOADING)) { + // job may already be cancelled + return; + } executeOnce(); } @@ -176,7 +179,7 @@ public class LoadLoadingTask extends LoadTask { } private long getLeftTimeMs() { - return jobDeadlineMs - System.currentTimeMillis(); + return Math.max(jobDeadlineMs - System.currentTimeMillis(), 1000L); } private void createProfile(Coordinator coord) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 2a8975e..ad65c77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -438,6 +438,13 @@ public class GlobalTransactionMgr implements Writable { continue; } info.add(db.getFullName()); + long runningNum = 0; + try { + DatabaseTransactionMgr dbMgr = getDatabaseTransactionMgr(dbId); + runningNum = dbMgr.getRunningTxnNums(); + } catch (AnalysisException e) { + } + info.add(runningNum); infos.add(info); } return infos; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
