This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new c96f66877 [Improve] submit flink job waiting for response improvement
c96f66877 is described below
commit c96f668773cd368421f71830c3fa43a8b0403b58
Author: benjobs <[email protected]>
AuthorDate: Thu Dec 21 22:46:45 2023 +0800
[Improve] submit flink job waiting for response improvement
---
.../core/service/impl/ApplicationServiceImpl.java | 314 ++++++++++-----------
.../flink/client/impl/YarnApplicationClient.scala | 5 +-
.../flink/client/impl/YarnPerJobClient.scala | 1 +
.../flink/client/impl/YarnSessionClient.scala | 1 +
.../flink/client/trait/YarnClientTrait.scala | 11 +
5 files changed, 169 insertions(+), 163 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 87066681e..349d16c0d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -26,7 +26,6 @@ import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.fs.LfsOperator;
-import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
@@ -1134,7 +1133,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
cancelFuture.cancel(true);
}
if (startFuture == null && cancelFuture == null) {
- this.updateToStopped(app);
+ this.doStopped(app);
}
}
@@ -1309,60 +1308,55 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
cancelFutureMap.put(application.getId(), cancelFuture);
- CompletableFutureUtils.runTimeout(
- cancelFuture,
- 10L,
- TimeUnit.MINUTES,
- cancelResponse -> {
- applicationLog.setSuccess(true);
- if (cancelResponse != null && cancelResponse.savePointDir() !=
null) {
- String savePointDir = cancelResponse.savePointDir();
- log.info("savePoint path: {}", savePointDir);
- SavePoint savePoint = new SavePoint();
- savePoint.setPath(savePointDir);
- savePoint.setAppId(application.getId());
- savePoint.setLatest(true);
- savePoint.setType(CheckPointType.SAVEPOINT.get());
- savePoint.setCreateTime(new Date());
- savePoint.setTriggerTime(triggerTime);
- savePointService.save(savePoint);
+ cancelFuture.whenComplete(
+ (cancelResponse, throwable) -> {
+ cancelFutureMap.remove(application.getId());
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ doStopped(application);
+ } else {
+ log.error("stop flink job fail.", throwable);
+ application.setOptionState(OptionState.NONE.getValue());
+ application.setState(FlinkAppState.FAILED.getValue());
+ updateById(application);
+
+ if (appParam.getSavePointed()) {
+ savePointService.expire(application.getId());
}
+ // re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.unWatching(toTrackId(application));
- }
- },
- e -> {
- if (e.getCause() instanceof CancellationException) {
- updateToStopped(application);
+ TrackId id = toTrackId(application);
+ k8SFlinkTrackMonitor.unWatching(id);
+ k8SFlinkTrackMonitor.doWatching(id);
} else {
- log.error("stop flink job fail.", e);
- application.setOptionState(OptionState.NONE.getValue());
- application.setState(FlinkAppState.FAILED.getValue());
- updateById(application);
-
- if (appParam.getSavePointed()) {
- savePointService.expire(application.getId());
- }
-
- // re-tracking flink job on kubernetes and logging exception
- if (isKubernetesApp(application)) {
- TrackId id = toTrackId(application);
- k8SFlinkTrackMonitor.unWatching(id);
- k8SFlinkTrackMonitor.doWatching(id);
- } else {
- FlinkRESTAPIWatcher.unWatching(application.getId());
- }
-
- String exception = Utils.stringifyException(e);
- applicationLog.setException(exception);
- applicationLog.setSuccess(false);
+ FlinkRESTAPIWatcher.unWatching(application.getId());
}
- })
- .whenComplete(
- (t, e) -> {
- cancelFutureMap.remove(application.getId());
- applicationLogService.save(applicationLog);
- });
+
+ String exception = Utils.stringifyException(throwable);
+ applicationLog.setException(exception);
+ applicationLog.setSuccess(false);
+ }
+ } else {
+ applicationLog.setSuccess(true);
+ if (cancelResponse != null && cancelResponse.savePointDir() !=
null) {
+ String savePointDir = cancelResponse.savePointDir();
+ log.info("savePoint path: {}", savePointDir);
+ SavePoint savePoint = new SavePoint();
+ savePoint.setPath(savePointDir);
+ savePoint.setAppId(application.getId());
+ savePoint.setLatest(true);
+ savePoint.setType(CheckPointType.SAVEPOINT.get());
+ savePoint.setCreateTime(new Date());
+ savePoint.setTriggerTime(triggerTime);
+ savePointService.save(savePoint);
+ }
+ if (isKubernetesApp(application)) {
+ k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+ }
+ }
+ // save log...
+ applicationLogService.save(applicationLog);
+ });
}
@Override
@@ -1387,7 +1381,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
"This state.savepoints.dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null. Please
specify a directory path for the checkpoint data.";
- } else if (pathPart.length() == 0 || "/".equals(pathPart)) {
+ } else if (pathPart.isEmpty() || "/".equals(pathPart)) {
error =
"This state.savepoints.dir value "
+ savepointPath
@@ -1433,8 +1427,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// check job on yarn is already running
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
ApiAlertException.throwIfTrue(
- checkAppRepeatInYarn(application.getJobName()),
- "[StreamPark] The same job name is already running in the yarn
queue");
+ !getApplicationReports(application.getJobName()).isEmpty(),
+ "The same job name is already running in the yarn queue");
}
// if manually started, clear the restart flag
@@ -1581,96 +1575,90 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
startFutureMap.put(application.getId(), future);
- CompletableFutureUtils.runTimeout(
- future,
- 2L,
- TimeUnit.MINUTES,
- submitResponse -> {
- if (submitResponse.flinkConfig() != null) {
- String jmMemory =
-
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
- if (jmMemory != null) {
-
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
- }
- String tmMemory =
-
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
- if (tmMemory != null) {
-
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
- }
- }
- application.setAppId(submitResponse.clusterId());
- if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
- application.setJobId(submitResponse.jobId());
- }
-
- if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
- application.setJobManagerUrl(submitResponse.jobManagerUrl());
-
applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
- }
- applicationLog.setYarnAppId(submitResponse.clusterId());
- application.setStartTime(new Date());
- application.setEndTime(null);
- if (isKubernetesApp(application)) {
- application.setRelease(ReleaseState.DONE.get());
- }
- updateById(application);
+ future.whenComplete(
+ (response, throwable) -> {
+ // 1) remove Future
+ startFutureMap.remove(application.getId());
- // if start completed, will be added task to tracking queue
- if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ // 2) exception
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ doStopped(application);
+ return;
+ } else {
+ Application app = getById(appParam.getId());
+ app.setState(FlinkAppState.FAILED.getValue());
+ app.setOptionState(OptionState.NONE.getValue());
+ updateById(app);
+ if (isKubernetesApp(app)) {
+ k8SFlinkTrackMonitor.unWatching(toTrackId(app));
} else {
- FlinkRESTAPIWatcher.setOptionState(appParam.getId(),
OptionState.STARTING);
- FlinkRESTAPIWatcher.doWatching(application);
+ FlinkRESTAPIWatcher.unWatching(appParam.getId());
}
+ }
+ String exception = Utils.stringifyException(throwable);
+ applicationLog.setException(exception);
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+ // set savepoint to expire
+ savePointService.expire(application.getId());
+ return;
+ }
- applicationLog.setSuccess(true);
- // set savepoint to expire
- savePointService.expire(application.getId());
- },
- e -> {
- if (e.getCause() instanceof CancellationException) {
- updateToStopped(application);
- } else {
- String exception = Utils.stringifyException(e);
- applicationLog.setException(exception);
- applicationLog.setSuccess(false);
- Application app = getById(appParam.getId());
- app.setState(FlinkAppState.FAILED.getValue());
- app.setOptionState(OptionState.NONE.getValue());
- updateById(app);
- if (isKubernetesApp(app)) {
- k8SFlinkTrackMonitor.unWatching(toTrackId(app));
- } else {
- FlinkRESTAPIWatcher.unWatching(appParam.getId());
- }
- }
- })
- .whenComplete(
- (t, e) -> {
- if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
- String domainName = settingService.getIngressModeDefault();
- if (StringUtils.isNotBlank(domainName)) {
- try {
- IngressController.configureIngress(
- domainName, application.getClusterId(),
application.getK8sNamespace());
- } catch (KubernetesClientException
kubernetesClientException) {
- log.info(
- "Failed to create ingress, stack info:{}",
- kubernetesClientException.getMessage());
- applicationLog.setException(e.getMessage());
- applicationLog.setSuccess(false);
- applicationLogService.save(applicationLog);
- application.setState(FlinkAppState.FAILED.getValue());
- application.setOptionState(OptionState.NONE.getValue());
- updateById(application);
- return;
- }
+ // 3) success
+ applicationLog.setSuccess(true);
+ if (response.flinkConfig() != null) {
+ String jmMemory =
response.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
+ if (jmMemory != null) {
+
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+ }
+ String tmMemory =
response.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
+ if (tmMemory != null) {
+
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+ }
+ }
+ application.setAppId(response.clusterId());
+ if (StringUtils.isNoneEmpty(response.jobId())) {
+ application.setJobId(response.jobId());
+ }
+
+ if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+ application.setJobManagerUrl(response.jobManagerUrl());
+ applicationLog.setJobManagerUrl(response.jobManagerUrl());
+ }
+ applicationLog.setYarnAppId(response.clusterId());
+ application.setStartTime(new Date());
+ application.setEndTime(null);
+
+ // if start completed, will be added task to tracking queue
+ if (isKubernetesApp(application)) {
+ application.setRelease(ReleaseState.DONE.get());
+ k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+ String domainName = settingService.getIngressModeDefault();
+ if (StringUtils.isNotBlank(domainName)) {
+ try {
+ IngressController.configureIngress(
+ domainName, application.getClusterId(),
application.getK8sNamespace());
+ } catch (KubernetesClientException e) {
+ log.info("Failed to create ingress, stack info:{}",
e.getMessage());
+ applicationLog.setException(e.getMessage());
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+ application.setState(FlinkAppState.FAILED.getValue());
+ application.setOptionState(OptionState.NONE.getValue());
}
}
-
- applicationLogService.save(applicationLog);
- startFutureMap.remove(application.getId());
- });
+ }
+ } else {
+ FlinkRESTAPIWatcher.setOptionState(appParam.getId(),
OptionState.STARTING);
+ FlinkRESTAPIWatcher.doWatching(application);
+ }
+ // update app
+ updateById(application);
+ // save log
+ applicationLogService.save(applicationLog);
+ });
}
private Map<String, Object> getProperties(Application application) {
@@ -1734,7 +1722,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return properties;
}
- private void updateToStopped(Application app) {
+ private void doStopped(Application app) {
Application application = getById(app);
application.setOptionState(OptionState.NONE.getValue());
application.setState(FlinkAppState.CANCELED.getValue());
@@ -1749,6 +1737,17 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
+ // kill application
+ if (ExecutionMode.isYarnMode(app.getExecutionModeEnum())) {
+ try {
+ List<ApplicationReport> applications =
getApplicationReports(application.getJobName());
+ if (!applications.isEmpty()) {
+ YarnClient yarnClient = HadoopUtils.yarnClient();
+ yarnClient.killApplication(applications.get(0).getApplicationId());
+ }
+ } catch (Exception ignored) {
+ }
+ }
}
private Boolean checkJobName(String jobName) {
@@ -1782,7 +1781,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@VisibleForTesting
public boolean validateQueueIfNeeded(Application appParam) {
yarnQueueService.checkQueueLabel(appParam.getExecutionModeEnum(),
appParam.getYarnQueue());
- if (!isYarnNotDefaultQueue(appParam)) {
+ if (isYarnNotDefaultQueue(appParam)) {
return true;
}
return yarnQueueService.existByTeamIdQueueLabel(appParam.getTeamId(),
appParam.getYarnQueue());
@@ -1798,7 +1797,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@VisibleForTesting
public boolean validateQueueIfNeeded(Application oldApp, Application newApp)
{
yarnQueueService.checkQueueLabel(newApp.getExecutionModeEnum(),
newApp.getYarnQueue());
- if (!isYarnNotDefaultQueue(newApp)) {
+ if (isYarnNotDefaultQueue(newApp)) {
return true;
}
@@ -1819,31 +1818,28 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
* (empty or default), return true, false else.
*/
private boolean isYarnNotDefaultQueue(Application application) {
- return
ExecutionMode.isYarnPerJobOrAppMode(application.getExecutionModeEnum())
- && !yarnQueueService.isDefaultQueue(application.getYarnQueue());
+ return
!ExecutionMode.isYarnPerJobOrAppMode(application.getExecutionModeEnum())
+ || yarnQueueService.isDefaultQueue(application.getYarnQueue());
}
- /**
- * Check whether a job with the same name is running in the yarn queue
- *
- * @param jobName
- * @return
- */
- private boolean checkAppRepeatInYarn(String jobName) {
+ private List<ApplicationReport> getApplicationReports(String jobName) {
try {
YarnClient yarnClient = HadoopUtils.yarnClient();
Set<String> types =
Sets.newHashSet(
ApplicationType.STREAMPARK_FLINK.getName(),
ApplicationType.APACHE_FLINK.getName());
EnumSet<YarnApplicationState> states =
- EnumSet.of(YarnApplicationState.RUNNING,
YarnApplicationState.ACCEPTED);
- List<ApplicationReport> applications = yarnClient.getApplications(types,
states);
- for (ApplicationReport report : applications) {
- if (report.getName().equals(jobName)) {
- return true;
- }
- }
- return false;
+ EnumSet.of(
+ YarnApplicationState.NEW,
+ YarnApplicationState.NEW_SAVING,
+ YarnApplicationState.SUBMITTED,
+ YarnApplicationState.ACCEPTED,
+ YarnApplicationState.RUNNING);
+ Set<String> yarnTag =
Sets.newHashSet(ApplicationType.STREAMPARK_FLINK.getName());
+ List<ApplicationReport> applications = yarnClient.getApplications(types,
states, yarnTag);
+ return applications.stream()
+ .filter(report -> report.getName().equals(jobName))
+ .collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("The yarn api is abnormal. Ensure that yarn
is running properly.");
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 26af8f8ab..1f4fb0ed3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -46,6 +46,7 @@ object YarnApplicationClient extends YarnClientTrait {
private[this] lazy val workspace = Workspace.remote
override def setConfig(submitRequest: SubmitRequest, flinkConfig:
Configuration): Unit = {
+ super.setConfig(submitRequest, flinkConfig)
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
submitRequest.flinkVersion.flinkHome)
val currentUser = UserGroupInformation.getCurrentUser
@@ -85,10 +86,6 @@ object YarnApplicationClient extends YarnClientTrait {
PipelineOptions.JARS,
Collections.singletonList(
submitRequest.buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath))
- // yarn application name
- .safeSet(YarnConfigOptions.APPLICATION_NAME,
submitRequest.effectiveAppName)
- // yarn application Type
- .safeSet(YarnConfigOptions.APPLICATION_TYPE,
submitRequest.applicationType.getName)
logInfo(s"""
|------------------------------------------------------------------
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 79813adf2..244737d08 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -40,6 +40,7 @@ import scala.collection.JavaConversions._
object YarnPerJobClient extends YarnClientTrait {
override def setConfig(submitRequest: SubmitRequest, flinkConfig:
Configuration): Unit = {
+ super.setConfig(submitRequest, flinkConfig)
// execution.target
flinkConfig
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 0ef54d5c1..d2a98b740 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -47,6 +47,7 @@ object YarnSessionClient extends YarnClientTrait {
* @param flinkConfig
*/
override def setConfig(submitRequest: SubmitRequest, flinkConfig:
Configuration): Unit = {
+ super.setConfig(submitRequest, flinkConfig)
flinkConfig
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
logInfo(s"""
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index fa88df7dd..1a0a24268 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.client.`trait`
+import org.apache.streampark.common.enums.ApplicationType
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.bean._
@@ -38,6 +39,16 @@ import scala.util.Try
/** yarn application mode submit */
trait YarnClientTrait extends FlinkClientTrait {
+ override def setConfig(submitRequest: SubmitRequest, flinkConfig:
Configuration): Unit = {
+ // yarn application name
+ flinkConfig
+ .safeSet(YarnConfigOptions.APPLICATION_NAME,
submitRequest.effectiveAppName)
+ // yarn application Type
+ .safeSet(YarnConfigOptions.APPLICATION_TYPE,
submitRequest.applicationType.getName)
+ // yarn application Tag
+ .safeSet(YarnConfigOptions.APPLICATION_TAGS,
ApplicationType.STREAMPARK_FLINK.getName)
+ }
+
private[this] def executeClientAction[R <: SavepointRequestTrait, O](
request: R,
flinkConf: Configuration,