This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 178a943afb0c18ed1f4df6ba0dc6fccc3814660f
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Mar 8 18:53:45 2022 +0800

    [fix](broker-load) fix bug that a cancelled job's state is LOADING (#8363)
    
    1.
    Before executing LoadLoadingTask of a broker load, we should check if the 
job is cancelled.
    
    2.
    Add a new column `runningTransactionNum` for `show proc "/transactions"`.
    So that we can view all running txns in each db in one command.
---
 .../java/org/apache/doris/catalog/Catalog.java     | 15 ++++++----
 .../apache/doris/common/proc/TransDbProcDir.java   |  1 +
 .../main/java/org/apache/doris/load/LoadJob.java   |  7 +++--
 .../org/apache/doris/load/loadv2/JobState.java     |  6 +++-
 .../java/org/apache/doris/load/loadv2/LoadJob.java | 34 +++++++++++++++-------
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |  7 +++--
 .../doris/transaction/GlobalTransactionMgr.java    |  7 +++++
 7 files changed, 54 insertions(+), 23 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 933ba28..45598ab 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -161,6 +161,7 @@ import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.Timestamp;
 import org.apache.doris.load.DeleteHandler;
+import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.ExportChecker;
 import org.apache.doris.load.ExportJob;
 import org.apache.doris.load.ExportMgr;
@@ -258,11 +259,15 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 
 import java.io.BufferedReader;
 import java.io.DataInputStream;
@@ -292,12 +297,6 @@ import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
-import com.sleepycat.je.rep.InsufficientLogException;
-import com.sleepycat.je.rep.NetworkRestore;
-import com.sleepycat.je.rep.NetworkRestoreConfig;
-
-import org.codehaus.jackson.map.ObjectMapper;
-
 public class Catalog {
     private static final Logger LOG = LogManager.getLogger(Catalog.class);
     // 0 ~ 9999 used for qe
@@ -1762,6 +1761,10 @@ public class Catalog {
                 // LABEL_KEEP_MAX_MS
                 // This job must be FINISHED or CANCELLED
                 if (!job.isExpired(currentTimeMs)) {
+                    if (job.getEtlJobType() != EtlJobType.HADOOP) {
+                        LOG.warn("job {} with type is deprecated, skip it", 
job.getId(), job.getEtlJobType());
+                        continue;
+                    }
                     load.unprotectAddLoadJob(job, true /* replay */);
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java
index 634490b..0b92aa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TransDbProcDir.java
@@ -33,6 +33,7 @@ public class TransDbProcDir implements ProcDirInterface {
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
             .add("DbId")
             .add("DbName")
+            .add("RunningTransactionNum")
             .build();
 
     public TransDbProcDir() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
index 9ba1aeb..c561895 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
@@ -39,14 +39,14 @@ import org.apache.doris.thrift.TEtlState;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TResourceInfo;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -844,6 +844,7 @@ public class LoadJob implements Writable {
             Text.writeString(out, tableName);
         }
     }
+
     public void readFields(DataInput in) throws IOException {
         long version = Catalog.getCurrentCatalogJournalVersion();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
index f25e05b..e023b68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
@@ -25,5 +25,9 @@ public enum JobState {
     LOADING, // job is running
     COMMITTED, // transaction is committed but not visible
     FINISHED, // transaction is visible and job is finished
-    CANCELLED // transaction is aborted and job is cancelled
+    CANCELLED; // transaction is aborted and job is cancelled
+
+    public boolean isFinalState() {
+        return this == FINISHED || this == CANCELLED;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 0fe8ef0..d979eac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -60,9 +60,6 @@ import org.apache.doris.transaction.ErrorTabletInfo;
 import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Joiner;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Lists;
@@ -73,6 +70,9 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.SerializedName;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -475,31 +475,43 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      *
      * @param jobState
      */
-    public void updateState(JobState jobState) {
+    public boolean updateState(JobState jobState) {
         writeLock();
         try {
-            unprotectedUpdateState(jobState);
+            return unprotectedUpdateState(jobState);
         } finally {
             writeUnlock();
         }
     }
 
-    protected void unprotectedUpdateState(JobState jobState) {
+    protected boolean unprotectedUpdateState(JobState jobState) {
+        if (this.state.isFinalState()) {
+            // This is a simple self-protection mechanism to prevent jobs
+            // that have entered the final state from being placed in a 
non-terminating state again.
+            // For example, when a LoadLoadingTask starts running, it tries to 
set the job state to LOADING,
+            // but the job may have been cancelled (CANCELLED) due to a 
timeout.
+            // At this point, the job state should not be set to LOADING again.
+            // It is safe to return directly here without any processing,
+            // and other processes will ensure that the job ends properly.
+            LOG.warn("the load job {} is in final state: {}, should not update 
state to {} again",
+                    id, this.state, jobState);
+            return false;
+        }
         switch (jobState) {
             case UNKNOWN:
                 executeUnknown();
-                break;
+                return true;
             case LOADING:
                 executeLoad();
-                break;
+                return true;
             case COMMITTED:
                 executeCommitted();
-                break;
+                return true;
             case FINISHED:
                 executeFinish();
-                break;
+                return true;
             default:
-                break;
+                return false;
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 6ff10fe..5f05a6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -115,7 +115,10 @@ public class LoadLoadingTask extends LoadTask {
                 DebugUtil.printId(loadId), callback.getCallbackId(), 
db.getFullName(), table.getName(), retryTime);
         retryTime--;
         beginTime = System.nanoTime();
-        ((BrokerLoadJob) callback).updateState(JobState.LOADING);
+        if (!((BrokerLoadJob) callback).updateState(JobState.LOADING)) {
+            // job may already be cancelled
+            return;
+        }
         executeOnce();
     }
 
@@ -176,7 +179,7 @@ public class LoadLoadingTask extends LoadTask {
     }
 
     private long getLeftTimeMs() {
-        return jobDeadlineMs - System.currentTimeMillis();
+        return Math.max(jobDeadlineMs - System.currentTimeMillis(), 1000L);
     }
 
     private void createProfile(Coordinator coord) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 2a8975e..ad65c77 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -438,6 +438,13 @@ public class GlobalTransactionMgr implements Writable {
                 continue;
             }
             info.add(db.getFullName());
+            long runningNum = 0;
+            try {
+                DatabaseTransactionMgr dbMgr = getDatabaseTransactionMgr(dbId);
+                runningNum = dbMgr.getRunningTxnNums();
+            } catch (AnalysisException e) {
+            }
+            info.add(runningNum);
             infos.add(info);
         }
         return infos;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to