This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8f10317 [Spark load][Bug] fix that cancelling a spark load in the
`PENDING` phase will not succeed (#4536)
8f10317 is described below
commit 8f10317e0d111aa6c1b5cb63b888a0965c1d49f5
Author: xy720 <[email protected]>
AuthorDate: Sun Sep 6 20:32:47 2020 +0800
[Spark load][Bug] fix that cancelling a spark load in the `PENDING` phase
will not succeed (#4536)
---
.../apache/doris/load/loadv2/SparkEtlJobHandler.java | 17 +++++++++++++----
.../apache/doris/load/loadv2/SparkLauncherMonitor.java | 5 +++++
.../apache/doris/load/loadv2/SparkLoadAppHandle.java | 4 ++++
.../java/org/apache/doris/load/loadv2/SparkLoadJob.java | 4 ++++
.../apache/doris/load/loadv2/SparkLoadPendingTask.java | 11 ++++++-----
.../doris/load/loadv2/SparkEtlJobHandlerTest.java | 4 ++--
6 files changed, 34 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index f3e2f91..21f6431 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -77,7 +77,7 @@ public class SparkEtlJobHandler {
private static final String YARN_KILL_CMD = "%s --config %s application
-kill %s";
public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig
etlJobConfig, SparkResource resource,
- BrokerDesc brokerDesc, SparkPendingTaskAttachment
attachment) throws LoadException {
+ BrokerDesc brokerDesc, SparkLoadAppHandle handle,
SparkPendingTaskAttachment attachment) throws LoadException {
// delete outputPath
deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);
@@ -142,13 +142,12 @@ public class SparkEtlJobHandler {
}
// start app
- SparkLoadAppHandle handle = null;
State state = null;
String appId = null;
String errMsg = "start spark app failed. error: ";
try {
Process process = launcher.launch();
- handle = new SparkLoadAppHandle(process);
+ handle.setProcess(process);
if (!FeConstants.runningUnitTest) {
SparkLauncherMonitor.LogMonitor logMonitor =
SparkLauncherMonitor.createLogMonitor(handle);
logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
@@ -264,8 +263,18 @@ public class SparkEtlJobHandler {
}
public void killEtlJob(SparkLoadAppHandle handle, String appId, long
loadJobId, SparkResource resource) throws LoadException {
- Preconditions.checkNotNull(appId);
if (resource.isYarnMaster()) {
+ // The appId may be empty when the load job is in PENDING phase.
This is because the appId is
+ // parsed from the spark launcher process's output (spark launcher
process submit job and then
+ // return appId). In this case, the spark job has still not been
submitted, we only need to kill
+ // the spark launcher process.
+ if (Strings.isNullOrEmpty(appId)) {
+ appId = handle.getAppId();
+ if (Strings.isNullOrEmpty(appId)) {
+ handle.kill();
+ return;
+ }
+ }
// prepare yarn config
String configDir = resource.prepareYarnConfig();
// yarn client path
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
index 9a66448..9af39c4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
@@ -106,6 +106,11 @@ public class SparkLauncherMonitor {
// UNKNOWN/SUBMITTED for a long time.
@Override
public void run() {
+ if (handle.getState() == SparkLoadAppHandle.State.KILLED) {
+ // If handle has been killed, kill the process
+ process.destroyForcibly();
+ return;
+ }
BufferedReader outReader = null;
String line = null;
long startTime = System.currentTimeMillis();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
index 2289655..c732027 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
@@ -134,6 +134,10 @@ public class SparkLoadAppHandle implements Writable {
public String getLogPath() { return this.logPath; }
+ public void setProcess(Process process) {
+ this.process = process;
+ }
+
public void setState(State state) {
this.state = state;
this.fireEvent(false);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index f2460c0..9297295 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -715,6 +715,10 @@ public class SparkLoadJob extends BulkLoadJob {
return etlStartTimestamp;
}
+ public SparkLoadAppHandle getHandle() {
+ return sparkLoadAppHandle;
+ }
+
public void clearSparkLauncherLog() {
String logPath = sparkLoadAppHandle.getLogPath();
if (!Strings.isNullOrEmpty(logPath)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index 185e84a..dbf5247 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -57,16 +57,15 @@ import
org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType;
import org.apache.doris.transaction.TransactionState;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -84,6 +83,7 @@ public class SparkLoadPendingTask extends LoadTask {
private final long loadJobId;
private final long transactionId;
private EtlJobConfig etlJobConfig;
+ private SparkLoadAppHandle sparkLoadAppHandle;
public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
Map<FileGroupAggKey, List<BrokerFileGroup>>
aggKeyToBrokerFileGroups,
@@ -98,6 +98,7 @@ public class SparkLoadPendingTask extends LoadTask {
this.loadJobId = loadTaskCallback.getId();
this.loadLabel = loadTaskCallback.getLabel();
this.transactionId = loadTaskCallback.getTransactionId();
+ this.sparkLoadAppHandle = loadTaskCallback.getHandle();
this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL);
}
@@ -115,7 +116,7 @@ public class SparkLoadPendingTask extends LoadTask {
// handler submit etl job
SparkEtlJobHandler handler = new SparkEtlJobHandler();
- handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource,
brokerDesc, sparkAttachment);
+ handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource,
brokerDesc, sparkLoadAppHandle, sparkAttachment);
LOG.info("submit spark etl job success. load job id: {}, attachment:
{}", loadJobId, sparkAttachment);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
index 904a8b1..73df628 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -167,7 +167,7 @@ public class SparkEtlJobHandlerTest {
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkPendingTaskAttachment attachment = new
SparkPendingTaskAttachment(pendingTaskId);
SparkEtlJobHandler handler = new SparkEtlJobHandler();
- handler.submitEtlJob(loadJobId, label, etlJobConfig, resource,
brokerDesc, attachment);
+ handler.submitEtlJob(loadJobId, label, etlJobConfig, resource,
brokerDesc, handle, attachment);
// check submit etl job success
Assert.assertEquals(appId, attachment.getAppId());
@@ -203,7 +203,7 @@ public class SparkEtlJobHandlerTest {
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkPendingTaskAttachment attachment = new
SparkPendingTaskAttachment(pendingTaskId);
SparkEtlJobHandler handler = new SparkEtlJobHandler();
- handler.submitEtlJob(loadJobId, label, etlJobConfig, resource,
brokerDesc, attachment);
+ handler.submitEtlJob(loadJobId, label, etlJobConfig, resource,
brokerDesc, handle, attachment);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]