This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 74791dca64 [Bug][Core] Local mode cancel running JobStatusRunner
(#7943)
74791dca64 is described below
commit 74791dca64a2a8546bcad0e7cc56c92b67c5dfa0
Author: Jast <[email protected]>
AuthorDate: Wed Oct 30 18:40:35 2024 +0800
[Bug][Core] Local mode cancel running JobStatusRunner (#7943)
---
.../starter/seatunnel/command/ClientExecuteCommand.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index b67288fb13..f6ef4d76d0 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -85,7 +85,9 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
try {
String clusterName = clientCommandArgs.getClusterName();
ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
- if (clientCommandArgs.getMasterType().equals(MasterType.LOCAL)) {
+ // get running mode
+ boolean isLocalMode =
clientCommandArgs.getMasterType().equals(MasterType.LOCAL);
+ if (isLocalMode) {
clusterName =
creatRandomClusterName(
StringUtils.isNotEmpty(clusterName)
@@ -160,7 +162,7 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
// create job proxy
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
if (clientCommandArgs.isAsync()) {
- if
(clientCommandArgs.getMasterType().equals(MasterType.LOCAL)) {
+ if (isLocalMode) {
log.warn("The job is running in local mode, can not
use async mode.");
} else {
return;
@@ -200,10 +202,13 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
TimeUnit.SECONDS);
- executorService.schedule(
- new JobStatusRunner(engineClient.getJobClient(),
jobId),
- 0,
- TimeUnit.SECONDS);
+ if (!isLocalMode) {
+ // LOCAL mode does not require running the job status
runner
+ executorService.schedule(
+ new JobStatusRunner(engineClient.getJobClient(),
jobId),
+ 0,
+ TimeUnit.SECONDS);
+ }
// wait for job complete
JobResult jobResult = clientJobProxy.waitForJobCompleteV2();