This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f10317  [Spark load][Bug] fix that cancelling a spark load in the 
`PENDING` phase will not succeed (#4536)
8f10317 is described below

commit 8f10317e0d111aa6c1b5cb63b888a0965c1d49f5
Author: xy720 <[email protected]>
AuthorDate: Sun Sep 6 20:32:47 2020 +0800

    [Spark load][Bug] fix that cancelling a spark load in the `PENDING` phase 
will not succeed (#4536)
---
 .../apache/doris/load/loadv2/SparkEtlJobHandler.java    | 17 +++++++++++++----
 .../apache/doris/load/loadv2/SparkLauncherMonitor.java  |  5 +++++
 .../apache/doris/load/loadv2/SparkLoadAppHandle.java    |  4 ++++
 .../java/org/apache/doris/load/loadv2/SparkLoadJob.java |  4 ++++
 .../apache/doris/load/loadv2/SparkLoadPendingTask.java  | 11 ++++++-----
 .../doris/load/loadv2/SparkEtlJobHandlerTest.java       |  4 ++--
 6 files changed, 34 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index f3e2f91..21f6431 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -77,7 +77,7 @@ public class SparkEtlJobHandler {
     private static final String YARN_KILL_CMD = "%s --config %s application 
-kill %s";
 
     public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig 
etlJobConfig, SparkResource resource,
-                             BrokerDesc brokerDesc, SparkPendingTaskAttachment 
attachment) throws LoadException {
+                             BrokerDesc brokerDesc, SparkLoadAppHandle handle, 
SparkPendingTaskAttachment attachment) throws LoadException {
         // delete outputPath
         deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);
 
@@ -142,13 +142,12 @@ public class SparkEtlJobHandler {
         }
 
         // start app
-        SparkLoadAppHandle handle = null;
         State state = null;
         String appId = null;
         String errMsg = "start spark app failed. error: ";
         try {
             Process process = launcher.launch();
-            handle = new SparkLoadAppHandle(process);
+            handle.setProcess(process);
             if (!FeConstants.runningUnitTest) {
                 SparkLauncherMonitor.LogMonitor logMonitor = 
SparkLauncherMonitor.createLogMonitor(handle);
                 logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
@@ -264,8 +263,18 @@ public class SparkEtlJobHandler {
     }
 
     public void killEtlJob(SparkLoadAppHandle handle, String appId, long 
loadJobId, SparkResource resource) throws LoadException {
-        Preconditions.checkNotNull(appId);
         if (resource.isYarnMaster()) {
+            // The appId may be empty when the load job is in PENDING phase. 
This is because the appId is
+            // parsed from the spark launcher process's output (spark launcher 
process submit job and then
+            // return appId). In this case, the spark job has still not been 
submitted, we only need to kill
+            // the spark launcher process.
+            if (Strings.isNullOrEmpty(appId)) {
+                appId = handle.getAppId();
+                if (Strings.isNullOrEmpty(appId)) {
+                    handle.kill();
+                    return;
+                }
+            }
             // prepare yarn config
             String configDir = resource.prepareYarnConfig();
             // yarn client path
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
index 9a66448..9af39c4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
@@ -106,6 +106,11 @@ public class SparkLauncherMonitor {
         // UNKNOWN/SUBMITTED for a long time.
         @Override
         public void run() {
+            if (handle.getState() == SparkLoadAppHandle.State.KILLED) {
+                // If handle has been killed, kill the process
+                process.destroyForcibly();
+                return;
+            }
             BufferedReader outReader = null;
             String line = null;
             long startTime = System.currentTimeMillis();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
index 2289655..c732027 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
@@ -134,6 +134,10 @@ public class SparkLoadAppHandle implements Writable {
 
     public String getLogPath() { return this.logPath; }
 
+    public void setProcess(Process process) {
+        this.process = process;
+    }
+
     public void setState(State state) {
         this.state = state;
         this.fireEvent(false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index f2460c0..9297295 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -715,6 +715,10 @@ public class SparkLoadJob extends BulkLoadJob {
         return etlStartTimestamp;
     }
 
+    public SparkLoadAppHandle getHandle() {
+        return sparkLoadAppHandle;
+    }
+
     public void clearSparkLauncherLog() {
         String logPath = sparkLoadAppHandle.getLogPath();
         if (!Strings.isNullOrEmpty(logPath)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index 185e84a..dbf5247 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -57,16 +57,15 @@ import 
org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType;
 import org.apache.doris.transaction.TransactionState;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -84,6 +83,7 @@ public class SparkLoadPendingTask extends LoadTask {
     private final long loadJobId;
     private final long transactionId;
     private EtlJobConfig etlJobConfig;
+    private SparkLoadAppHandle sparkLoadAppHandle;
 
     public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
                                 Map<FileGroupAggKey, List<BrokerFileGroup>> 
aggKeyToBrokerFileGroups,
@@ -98,6 +98,7 @@ public class SparkLoadPendingTask extends LoadTask {
         this.loadJobId = loadTaskCallback.getId();
         this.loadLabel = loadTaskCallback.getLabel();
         this.transactionId = loadTaskCallback.getTransactionId();
+        this.sparkLoadAppHandle = loadTaskCallback.getHandle();
         this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL);
     }
 
@@ -115,7 +116,7 @@ public class SparkLoadPendingTask extends LoadTask {
 
         // handler submit etl job
         SparkEtlJobHandler handler = new SparkEtlJobHandler();
-        handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, 
brokerDesc, sparkAttachment);
+        handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, 
brokerDesc, sparkLoadAppHandle, sparkAttachment);
         LOG.info("submit spark etl job success. load job id: {}, attachment: 
{}", loadJobId, sparkAttachment);
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
index 904a8b1..73df628 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -167,7 +167,7 @@ public class SparkEtlJobHandlerTest {
         BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
         SparkPendingTaskAttachment attachment = new 
SparkPendingTaskAttachment(pendingTaskId);
         SparkEtlJobHandler handler = new SparkEtlJobHandler();
-        handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, 
brokerDesc, attachment);
+        handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, 
brokerDesc, handle, attachment);
 
         // check submit etl job success
         Assert.assertEquals(appId, attachment.getAppId());
@@ -203,7 +203,7 @@ public class SparkEtlJobHandlerTest {
         BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
         SparkPendingTaskAttachment attachment = new 
SparkPendingTaskAttachment(pendingTaskId);
         SparkEtlJobHandler handler = new SparkEtlJobHandler();
-        handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, 
brokerDesc, attachment);
+        handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, 
brokerDesc, handle, attachment);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to