This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch submit in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 0ce7e260b7500df66196fa14f58849b3506efe50 Author: benjobs <[email protected]> AuthorDate: Fri Dec 22 13:13:11 2023 +0800 [Improve] submit flink job on yarn job name check improvement --- .../impl/ApplicationActionServiceImpl.java | 337 ++++++++++----------- .../flink/client/impl/YarnApplicationClient.scala | 1 + .../flink/client/impl/YarnPerJobClient.scala | 1 + .../flink/client/impl/YarnSessionClient.scala | 1 + .../flink/client/trait/FlinkClientTrait.scala | 10 +- .../flink/client/trait/YarnClientTrait.scala | 7 + 6 files changed, 182 insertions(+), 175 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index 401eb67d4..a7447cb0c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -19,7 +19,6 @@ package org.apache.streampark.console.core.service.application.impl; import org.apache.streampark.common.Constant; import org.apache.streampark.common.conf.ConfigKeys; -import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.FlinkDevelopmentMode; @@ -27,7 +26,6 @@ import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.enums.FlinkRestoreMode; import org.apache.streampark.common.enums.ResolveOrder; import org.apache.streampark.common.fs.FsOperator; -import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.ExceptionUtils; import org.apache.streampark.common.util.HadoopUtils; @@ -120,6 +118,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.Bridge.toTrackId; import static org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.isKubernetesApp; @@ -229,7 +228,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, cancelFuture.cancel(true); } if (startFuture == null && cancelFuture == null) { - this.updateToStopped(appParam); + this.doStopped(appParam); } } @@ -325,60 +324,55 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, cancelFutureMap.put(application.getId(), cancelFuture); - CompletableFutureUtils.runTimeout( - cancelFuture, - 10L, - TimeUnit.MINUTES, - cancelResponse -> { - applicationLog.setSuccess(true); - if (cancelResponse != null && cancelResponse.savePointDir() != null) { - String savePointDir = cancelResponse.savePointDir(); - log.info("savePoint path: {}", savePointDir); - SavePoint savePoint = new SavePoint(); - savePoint.setPath(savePointDir); - savePoint.setAppId(application.getId()); - savePoint.setLatest(true); - savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get()); - savePoint.setCreateTime(new Date()); - savePoint.setTriggerTime(triggerTime); - savePointService.save(savePoint); + cancelFuture.whenComplete( + (cancelResponse, throwable) -> { + cancelFutureMap.remove(application.getId()); + if (throwable != null) { + if (throwable instanceof CancellationException) { + doStopped(application); + } else { + log.error("stop flink job fail.", throwable); + application.setOptionState(OptionStateEnum.NONE.getValue()); + application.setState(FlinkAppStateEnum.FAILED.getValue()); + updateById(application); + + if (appParam.getSavePointed()) { + savePointService.expire(application.getId()); } + // re-tracking flink job on kubernetes and logging exception if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.unWatching(toTrackId(application)); - } - }, - e -> { - if (e.getCause() instanceof CancellationException) { - updateToStopped(application); + TrackId id = toTrackId(application); + k8SFlinkTrackMonitor.unWatching(id); + k8SFlinkTrackMonitor.doWatching(id); } else { - log.error("stop flink job fail.", e); - application.setOptionState(OptionStateEnum.NONE.getValue()); - application.setState(FlinkAppStateEnum.FAILED.getValue()); - updateById(application); - - if (appParam.getSavePointed()) { - savePointService.expire(application.getId()); - } - - // re-tracking flink job on kubernetes and logging exception - if (isKubernetesApp(application)) { - TrackId id = toTrackId(application); - k8SFlinkTrackMonitor.unWatching(id); - k8SFlinkTrackMonitor.doWatching(id); - } else { - FlinkAppHttpWatcher.unWatching(application.getId()); - } - - String exception = ExceptionUtils.stringifyException(e); - applicationLog.setException(exception); - applicationLog.setSuccess(false); + FlinkAppHttpWatcher.unWatching(application.getId()); } - }) - .whenComplete( - (t, e) -> { - cancelFutureMap.remove(application.getId()); - applicationLogService.save(applicationLog); - }); + + String exception = ExceptionUtils.stringifyException(throwable); + applicationLog.setException(exception); + applicationLog.setSuccess(false); + } + } else { + applicationLog.setSuccess(true); + if (cancelResponse != null && cancelResponse.savePointDir() != null) { + String savePointDir = cancelResponse.savePointDir(); + log.info("savePoint path: {}", savePointDir); + SavePoint savePoint = new SavePoint(); + savePoint.setPath(savePointDir); + savePoint.setAppId(application.getId()); + savePoint.setLatest(true); + savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get()); + savePoint.setCreateTime(new Date()); + savePoint.setTriggerTime(triggerTime); + savePointService.save(savePoint); + } + if (isKubernetesApp(application)) { + k8SFlinkTrackMonitor.unWatching(toTrackId(application)); + } + } + // save log... + applicationLogService.save(applicationLog); + }); } @Override @@ -390,10 +384,9 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly."); if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) { - ApiAlertException.throwIfTrue( - checkAppRepeatInYarn(application.getJobName()), - "[StreamPark] The same task name is already running in the yarn queue"); + !getApplicationReports(application.getJobName()).isEmpty(), + "The same job name is already running in the yarn queue"); } AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId()); @@ -486,124 +479,90 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, startFutureMap.put(application.getId(), future); - CompletableFutureUtils.runTimeout( - future, - 2L, - TimeUnit.MINUTES, - submitResponse -> { - if (submitResponse.flinkConfig() != null) { - String jmMemory = - submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY()); - if (jmMemory != null) { - application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes()); - } - String tmMemory = - submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY()); - if (tmMemory != null) { - application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes()); - } - } - application.setAppId(submitResponse.clusterId()); - if (StringUtils.isNoneEmpty(submitResponse.jobId())) { - application.setJobId(submitResponse.jobId()); - } - - if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) { - application.setJobManagerUrl(submitResponse.jobManagerUrl()); - applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl()); - } - applicationLog.setYarnAppId(submitResponse.clusterId()); - application.setStartTime(new Date()); - application.setEndTime(null); - if (isKubernetesApp(application)) { - application.setRelease(ReleaseStateEnum.DONE.get()); - } - updateById(application); - - // if start completed, will be added task to tracking queue - if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.doWatching(toTrackId(application)); + future.whenComplete( + (response, throwable) -> { + // 1) remove Future + startFutureMap.remove(application.getId()); + + // 2) exception + if (throwable != null) { + if (throwable instanceof CancellationException) { + doStopped(application); + return; + } else { + Application app = getById(appParam.getId()); + app.setState(FlinkAppStateEnum.FAILED.getValue()); + app.setOptionState(OptionStateEnum.NONE.getValue()); + updateById(app); + if (isKubernetesApp(app)) { + k8SFlinkTrackMonitor.unWatching(toTrackId(app)); } else { - FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING); - FlinkAppHttpWatcher.doWatching(application); + FlinkAppHttpWatcher.unWatching(appParam.getId()); } + } + String exception = ExceptionUtils.stringifyException(throwable); + applicationLog.setException(exception); + applicationLog.setSuccess(false); + applicationLogService.save(applicationLog); + // set savepoint to expire + savePointService.expire(application.getId()); + return; + } - applicationLog.setSuccess(true); - // set savepoint to expire - savePointService.expire(application.getId()); - }, - e -> { - if (e.getCause() instanceof CancellationException) { - updateToStopped(application); - } else { - String exception = ExceptionUtils.stringifyException(e); - applicationLog.setException(exception); - applicationLog.setSuccess(false); - Application app = getById(appParam.getId()); - app.setState(FlinkAppStateEnum.FAILED.getValue()); - app.setOptionState(OptionStateEnum.NONE.getValue()); - updateById(app); - if (isKubernetesApp(app)) { - k8SFlinkTrackMonitor.unWatching(toTrackId(app)); - } else { - FlinkAppHttpWatcher.unWatching(appParam.getId()); - } - } - }) - .whenComplete( - (t, e) -> { - if (!K8sFlinkConfig.isV2Enabled() - && FlinkExecutionMode.isKubernetesApplicationMode( - application.getExecutionMode())) { - String domainName = settingService.getIngressModeDefault(); - if (StringUtils.isNotBlank(domainName)) { - try { - IngressController.configureIngress( - domainName, application.getClusterId(), application.getK8sNamespace()); - } catch (KubernetesClientException kubernetesClientException) { - log.info( - "Failed to create ingress, stack info:{}", - kubernetesClientException.getMessage()); - applicationLog.setException(e.getMessage()); - applicationLog.setSuccess(false); - applicationLogService.save(applicationLog); - application.setState(FlinkAppStateEnum.FAILED.getValue()); - application.setOptionState(OptionStateEnum.NONE.getValue()); - updateById(application); - return; - } + // 3) success + applicationLog.setSuccess(true); + if (response.flinkConfig() != null) { + String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY()); + if (jmMemory != null) { + application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes()); + } + String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY()); + if (tmMemory != null) { + application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes()); + } + } + application.setAppId(response.clusterId()); + if (StringUtils.isNoneEmpty(response.jobId())) { + application.setJobId(response.jobId()); + } + + if (StringUtils.isNoneEmpty(response.jobManagerUrl())) { + application.setJobManagerUrl(response.jobManagerUrl()); + applicationLog.setJobManagerUrl(response.jobManagerUrl()); + } + applicationLog.setYarnAppId(response.clusterId()); + application.setStartTime(new Date()); + application.setEndTime(null); + + // if start completed, will be added task to tracking queue + if (isKubernetesApp(application)) { + application.setRelease(ReleaseStateEnum.DONE.get()); + k8SFlinkTrackMonitor.doWatching(toTrackId(application)); + if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) { + String domainName = settingService.getIngressModeDefault(); + if (StringUtils.isNotBlank(domainName)) { + try { + IngressController.configureIngress( + domainName, application.getClusterId(), application.getK8sNamespace()); + } catch (KubernetesClientException e) { + log.info("Failed to create ingress, stack info:{}", e.getMessage()); + applicationLog.setException(e.getMessage()); + applicationLog.setSuccess(false); + applicationLogService.save(applicationLog); + application.setState(FlinkAppStateEnum.FAILED.getValue()); + application.setOptionState(OptionStateEnum.NONE.getValue()); } } - - applicationLogService.save(applicationLog); - startFutureMap.remove(application.getId()); - }); - } - - /** - * Check whether a job with the same name is running in the yarn queue - * - * @param jobName - * @return - */ - private boolean checkAppRepeatInYarn(String jobName) { - try { - YarnClient yarnClient = HadoopUtils.yarnClient(); - Set<String> types = - Sets.newHashSet( - ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName()); - EnumSet<YarnApplicationState> states = - EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED); - List<ApplicationReport> applications = yarnClient.getApplications(types, states); - for (ApplicationReport report : applications) { - if (report.getName().equals(jobName)) { - return true; - } - } - return false; - } catch (Exception e) { - throw new RuntimeException("The yarn api is abnormal. Ensure that yarn is running properly."); - } + } + } else { + FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING); + FlinkAppHttpWatcher.doWatching(application); + } + // update app + updateById(application); + // save log + applicationLogService.save(applicationLog); + }); } private void starting(Application application) { @@ -781,7 +740,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, return properties; } - private void updateToStopped(Application app) { + private void doStopped(Application app) { Application application = getById(app); application.setOptionState(OptionStateEnum.NONE.getValue()); application.setState(FlinkAppStateEnum.CANCELED.getValue()); @@ -796,6 +755,17 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, } else { FlinkAppHttpWatcher.unWatching(application.getId()); } + // kill application + if (FlinkExecutionMode.isYarnMode(app.getFlinkExecutionMode())) { + try { + List<ApplicationReport> applications = getApplicationReports(application.getJobName()); + if (!applications.isEmpty()) { + YarnClient yarnClient = HadoopUtils.yarnClient(); + yarnClient.killApplication(applications.get(0).getApplicationId()); + } + } catch (Exception ignored) { + } + } } private String getSavePointed(Application appParam) { @@ -811,4 +781,31 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, } return null; } + + private List<ApplicationReport> getApplicationReports(String jobName) { + try { + YarnClient yarnClient = HadoopUtils.yarnClient(); + Set<String> types = + Sets.newHashSet( + ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName()); + EnumSet<YarnApplicationState> states = + EnumSet.of( + YarnApplicationState.NEW, + YarnApplicationState.NEW_SAVING, + YarnApplicationState.SUBMITTED, + YarnApplicationState.ACCEPTED, + YarnApplicationState.RUNNING); + Set<String> yarnTag = Sets.newHashSet("streampark"); + List<ApplicationReport> applications = yarnClient.getApplications(types, states, yarnTag); + // Compatible with historical versions. + if (applications.isEmpty()) { + applications = yarnClient.getApplications(types, states); + } + return applications.stream() + .filter(report -> report.getName().equals(jobName)) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new RuntimeException("The yarn api is abnormal. Ensure that yarn is running properly."); + } + } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index f4fe45761..4062896b1 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -49,6 +49,7 @@ object YarnApplicationClient extends YarnClientTrait { private[this] lazy val workspace = Workspace.remote override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = { + super.setConfig(submitRequest, flinkConfig) val flinkDefaultConfiguration = getFlinkDefaultConfiguration( submitRequest.flinkVersion.flinkHome) val currentUser = UserGroupInformation.getCurrentUser diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala index 370064f4a..b52263d7c 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala @@ -40,6 +40,7 @@ import scala.collection.convert.ImplicitConversions._ object YarnPerJobClient extends YarnClientTrait { override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = { + super.setConfig(submitRequest, flinkConfig) // execution.target flinkConfig .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index c91fc45f0..847786577 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -47,6 +47,7 @@ object YarnSessionClient extends YarnClientTrait { * @param flinkConfig */ override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = { + super.setConfig(submitRequest, flinkConfig) flinkConfig .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName) logInfo(s""" diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index a31250d4c..17a48c1e6 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -115,7 +115,6 @@ trait FlinkClientTrait extends Logger { flinkConfig .safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName) .safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName) - .safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint) .safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain) .safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest)) .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId) @@ -132,10 +131,11 @@ trait FlinkClientTrait extends Logger { // set savepoint parameter if (submitRequest.savePoint != null) { - flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint) - flinkConfig.setBoolean( - SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, - submitRequest.allowNonRestoredState) + flinkConfig + .safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint) + .setBoolean( + SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, + submitRequest.allowNonRestoredState) if ( submitRequest.flinkVersion.checkVersion( FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala index 4e58a50f9..20b245420 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala @@ -39,6 +39,13 @@ import scala.util.Try /** yarn application mode submit */ trait YarnClientTrait extends FlinkClientTrait { + override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = { + flinkConfig + .safeSet(YarnConfigOptions.APPLICATION_NAME, submitRequest.effectiveAppName) + .safeSet(YarnConfigOptions.APPLICATION_TYPE, submitRequest.applicationType.getName) + .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark") + } + private[this] def executeClientAction[R <: SavepointRequestTrait, O]( savepointRequestTrait: R, flinkConf: Configuration,
