This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new c96f66877 [Improve] submit flink job waiting for response improvement
c96f66877 is described below

commit c96f668773cd368421f71830c3fa43a8b0403b58
Author: benjobs <[email protected]>
AuthorDate: Thu Dec 21 22:46:45 2023 +0800

    [Improve] submit flink job waiting for response improvement
---
 .../core/service/impl/ApplicationServiceImpl.java  | 314 ++++++++++-----------
 .../flink/client/impl/YarnApplicationClient.scala  |   5 +-
 .../flink/client/impl/YarnPerJobClient.scala       |   1 +
 .../flink/client/impl/YarnSessionClient.scala      |   1 +
 .../flink/client/trait/YarnClientTrait.scala       |  11 +
 5 files changed, 169 insertions(+), 163 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 87066681e..349d16c0d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -26,7 +26,6 @@ import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.fs.LfsOperator;
-import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
@@ -1134,7 +1133,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       cancelFuture.cancel(true);
     }
     if (startFuture == null && cancelFuture == null) {
-      this.updateToStopped(app);
+      this.doStopped(app);
     }
   }
 
@@ -1309,60 +1308,55 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
-    CompletableFutureUtils.runTimeout(
-            cancelFuture,
-            10L,
-            TimeUnit.MINUTES,
-            cancelResponse -> {
-              applicationLog.setSuccess(true);
-              if (cancelResponse != null && cancelResponse.savePointDir() != 
null) {
-                String savePointDir = cancelResponse.savePointDir();
-                log.info("savePoint path: {}", savePointDir);
-                SavePoint savePoint = new SavePoint();
-                savePoint.setPath(savePointDir);
-                savePoint.setAppId(application.getId());
-                savePoint.setLatest(true);
-                savePoint.setType(CheckPointType.SAVEPOINT.get());
-                savePoint.setCreateTime(new Date());
-                savePoint.setTriggerTime(triggerTime);
-                savePointService.save(savePoint);
+    cancelFuture.whenComplete(
+        (cancelResponse, throwable) -> {
+          cancelFutureMap.remove(application.getId());
+          if (throwable != null) {
+            if (throwable instanceof CancellationException) {
+              doStopped(application);
+            } else {
+              log.error("stop flink job fail.", throwable);
+              application.setOptionState(OptionState.NONE.getValue());
+              application.setState(FlinkAppState.FAILED.getValue());
+              updateById(application);
+
+              if (appParam.getSavePointed()) {
+                savePointService.expire(application.getId());
               }
+              // re-tracking flink job on kubernetes and logging exception
               if (isKubernetesApp(application)) {
-                k8SFlinkTrackMonitor.unWatching(toTrackId(application));
-              }
-            },
-            e -> {
-              if (e.getCause() instanceof CancellationException) {
-                updateToStopped(application);
+                TrackId id = toTrackId(application);
+                k8SFlinkTrackMonitor.unWatching(id);
+                k8SFlinkTrackMonitor.doWatching(id);
               } else {
-                log.error("stop flink job fail.", e);
-                application.setOptionState(OptionState.NONE.getValue());
-                application.setState(FlinkAppState.FAILED.getValue());
-                updateById(application);
-
-                if (appParam.getSavePointed()) {
-                  savePointService.expire(application.getId());
-                }
-
-                // re-tracking flink job on kubernetes and logging exception
-                if (isKubernetesApp(application)) {
-                  TrackId id = toTrackId(application);
-                  k8SFlinkTrackMonitor.unWatching(id);
-                  k8SFlinkTrackMonitor.doWatching(id);
-                } else {
-                  FlinkRESTAPIWatcher.unWatching(application.getId());
-                }
-
-                String exception = Utils.stringifyException(e);
-                applicationLog.setException(exception);
-                applicationLog.setSuccess(false);
+                FlinkRESTAPIWatcher.unWatching(application.getId());
               }
-            })
-        .whenComplete(
-            (t, e) -> {
-              cancelFutureMap.remove(application.getId());
-              applicationLogService.save(applicationLog);
-            });
+
+              String exception = Utils.stringifyException(throwable);
+              applicationLog.setException(exception);
+              applicationLog.setSuccess(false);
+            }
+          } else {
+            applicationLog.setSuccess(true);
+            if (cancelResponse != null && cancelResponse.savePointDir() != 
null) {
+              String savePointDir = cancelResponse.savePointDir();
+              log.info("savePoint path: {}", savePointDir);
+              SavePoint savePoint = new SavePoint();
+              savePoint.setPath(savePointDir);
+              savePoint.setAppId(application.getId());
+              savePoint.setLatest(true);
+              savePoint.setType(CheckPointType.SAVEPOINT.get());
+              savePoint.setCreateTime(new Date());
+              savePoint.setTriggerTime(triggerTime);
+              savePointService.save(savePoint);
+            }
+            if (isKubernetesApp(application)) {
+              k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+            }
+          }
+          // save log...
+          applicationLogService.save(applicationLog);
+        });
   }
 
   @Override
@@ -1387,7 +1381,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             "This state.savepoints.dir value "
                 + savepointPath
                 + " path part to store the checkpoint data in is null. Please 
specify a directory path for the checkpoint data.";
-      } else if (pathPart.length() == 0 || "/".equals(pathPart)) {
+      } else if (pathPart.isEmpty() || "/".equals(pathPart)) {
         error =
             "This state.savepoints.dir value "
                 + savepointPath
@@ -1433,8 +1427,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // check job on yarn is already running
     if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
       ApiAlertException.throwIfTrue(
-          checkAppRepeatInYarn(application.getJobName()),
-          "[StreamPark] The same job name is already running in the yarn 
queue");
+          !getApplicationReports(application.getJobName()).isEmpty(),
+          "The same job name is already running in the yarn queue");
     }
 
     // if manually started, clear the restart flag
@@ -1581,96 +1575,90 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     startFutureMap.put(application.getId(), future);
 
-    CompletableFutureUtils.runTimeout(
-            future,
-            2L,
-            TimeUnit.MINUTES,
-            submitResponse -> {
-              if (submitResponse.flinkConfig() != null) {
-                String jmMemory =
-                    
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
-                if (jmMemory != null) {
-                  
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
-                }
-                String tmMemory =
-                    
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
-                if (tmMemory != null) {
-                  
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
-                }
-              }
-              application.setAppId(submitResponse.clusterId());
-              if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
-                application.setJobId(submitResponse.jobId());
-              }
-
-              if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
-                application.setJobManagerUrl(submitResponse.jobManagerUrl());
-                
applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
-              }
-              applicationLog.setYarnAppId(submitResponse.clusterId());
-              application.setStartTime(new Date());
-              application.setEndTime(null);
-              if (isKubernetesApp(application)) {
-                application.setRelease(ReleaseState.DONE.get());
-              }
-              updateById(application);
+    future.whenComplete(
+        (response, throwable) -> {
+          // 1) remove Future
+          startFutureMap.remove(application.getId());
 
-              // if start completed, will be added task to tracking queue
-              if (isKubernetesApp(application)) {
-                k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+          // 2) exception
+          if (throwable != null) {
+            if (throwable instanceof CancellationException) {
+              doStopped(application);
+              return;
+            } else {
+              Application app = getById(appParam.getId());
+              app.setState(FlinkAppState.FAILED.getValue());
+              app.setOptionState(OptionState.NONE.getValue());
+              updateById(app);
+              if (isKubernetesApp(app)) {
+                k8SFlinkTrackMonitor.unWatching(toTrackId(app));
               } else {
-                FlinkRESTAPIWatcher.setOptionState(appParam.getId(), 
OptionState.STARTING);
-                FlinkRESTAPIWatcher.doWatching(application);
+                FlinkRESTAPIWatcher.unWatching(appParam.getId());
               }
+            }
+            String exception = Utils.stringifyException(throwable);
+            applicationLog.setException(exception);
+            applicationLog.setSuccess(false);
+            applicationLogService.save(applicationLog);
+            // set savepoint to expire
+            savePointService.expire(application.getId());
+            return;
+          }
 
-              applicationLog.setSuccess(true);
-              // set savepoint to expire
-              savePointService.expire(application.getId());
-            },
-            e -> {
-              if (e.getCause() instanceof CancellationException) {
-                updateToStopped(application);
-              } else {
-                String exception = Utils.stringifyException(e);
-                applicationLog.setException(exception);
-                applicationLog.setSuccess(false);
-                Application app = getById(appParam.getId());
-                app.setState(FlinkAppState.FAILED.getValue());
-                app.setOptionState(OptionState.NONE.getValue());
-                updateById(app);
-                if (isKubernetesApp(app)) {
-                  k8SFlinkTrackMonitor.unWatching(toTrackId(app));
-                } else {
-                  FlinkRESTAPIWatcher.unWatching(appParam.getId());
-                }
-              }
-            })
-        .whenComplete(
-            (t, e) -> {
-              if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
-                String domainName = settingService.getIngressModeDefault();
-                if (StringUtils.isNotBlank(domainName)) {
-                  try {
-                    IngressController.configureIngress(
-                        domainName, application.getClusterId(), 
application.getK8sNamespace());
-                  } catch (KubernetesClientException 
kubernetesClientException) {
-                    log.info(
-                        "Failed to create ingress, stack info:{}",
-                        kubernetesClientException.getMessage());
-                    applicationLog.setException(e.getMessage());
-                    applicationLog.setSuccess(false);
-                    applicationLogService.save(applicationLog);
-                    application.setState(FlinkAppState.FAILED.getValue());
-                    application.setOptionState(OptionState.NONE.getValue());
-                    updateById(application);
-                    return;
-                  }
+          // 3) success
+          applicationLog.setSuccess(true);
+          if (response.flinkConfig() != null) {
+            String jmMemory = 
response.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
+            if (jmMemory != null) {
+              
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+            }
+            String tmMemory = 
response.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
+            if (tmMemory != null) {
+              
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+            }
+          }
+          application.setAppId(response.clusterId());
+          if (StringUtils.isNoneEmpty(response.jobId())) {
+            application.setJobId(response.jobId());
+          }
+
+          if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+            application.setJobManagerUrl(response.jobManagerUrl());
+            applicationLog.setJobManagerUrl(response.jobManagerUrl());
+          }
+          applicationLog.setYarnAppId(response.clusterId());
+          application.setStartTime(new Date());
+          application.setEndTime(null);
+
+          // if start completed, will be added task to tracking queue
+          if (isKubernetesApp(application)) {
+            application.setRelease(ReleaseState.DONE.get());
+            k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+            if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+              String domainName = settingService.getIngressModeDefault();
+              if (StringUtils.isNotBlank(domainName)) {
+                try {
+                  IngressController.configureIngress(
+                      domainName, application.getClusterId(), 
application.getK8sNamespace());
+                } catch (KubernetesClientException e) {
+                  log.info("Failed to create ingress, stack info:{}", 
e.getMessage());
+                  applicationLog.setException(e.getMessage());
+                  applicationLog.setSuccess(false);
+                  applicationLogService.save(applicationLog);
+                  application.setState(FlinkAppState.FAILED.getValue());
+                  application.setOptionState(OptionState.NONE.getValue());
                 }
               }
-
-              applicationLogService.save(applicationLog);
-              startFutureMap.remove(application.getId());
-            });
+            }
+          } else {
+            FlinkRESTAPIWatcher.setOptionState(appParam.getId(), 
OptionState.STARTING);
+            FlinkRESTAPIWatcher.doWatching(application);
+          }
+          // update app
+          updateById(application);
+          // save log
+          applicationLogService.save(applicationLog);
+        });
   }
 
   private Map<String, Object> getProperties(Application application) {
@@ -1734,7 +1722,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return properties;
   }
 
-  private void updateToStopped(Application app) {
+  private void doStopped(Application app) {
     Application application = getById(app);
     application.setOptionState(OptionState.NONE.getValue());
     application.setState(FlinkAppState.CANCELED.getValue());
@@ -1749,6 +1737,17 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     } else {
       FlinkRESTAPIWatcher.unWatching(application.getId());
     }
+    // kill application
+    if (ExecutionMode.isYarnMode(app.getExecutionModeEnum())) {
+      try {
+        List<ApplicationReport> applications = 
getApplicationReports(application.getJobName());
+        if (!applications.isEmpty()) {
+          YarnClient yarnClient = HadoopUtils.yarnClient();
+          yarnClient.killApplication(applications.get(0).getApplicationId());
+        }
+      } catch (Exception ignored) {
+      }
+    }
   }
 
   private Boolean checkJobName(String jobName) {
@@ -1782,7 +1781,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   @VisibleForTesting
   public boolean validateQueueIfNeeded(Application appParam) {
     yarnQueueService.checkQueueLabel(appParam.getExecutionModeEnum(), 
appParam.getYarnQueue());
-    if (!isYarnNotDefaultQueue(appParam)) {
+    if (isYarnNotDefaultQueue(appParam)) {
       return true;
     }
     return yarnQueueService.existByTeamIdQueueLabel(appParam.getTeamId(), 
appParam.getYarnQueue());
@@ -1798,7 +1797,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   @VisibleForTesting
   public boolean validateQueueIfNeeded(Application oldApp, Application newApp) 
{
     yarnQueueService.checkQueueLabel(newApp.getExecutionModeEnum(), 
newApp.getYarnQueue());
-    if (!isYarnNotDefaultQueue(newApp)) {
+    if (isYarnNotDefaultQueue(newApp)) {
       return true;
     }
 
@@ -1819,31 +1818,28 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
    *     (empty or default), return true, false else.
    */
   private boolean isYarnNotDefaultQueue(Application application) {
-    return 
ExecutionMode.isYarnPerJobOrAppMode(application.getExecutionModeEnum())
-        && !yarnQueueService.isDefaultQueue(application.getYarnQueue());
+    return 
!ExecutionMode.isYarnPerJobOrAppMode(application.getExecutionModeEnum())
+        || yarnQueueService.isDefaultQueue(application.getYarnQueue());
   }
 
-  /**
-   * Check whether a job with the same name is running in the yarn queue
-   *
-   * @param jobName
-   * @return
-   */
-  private boolean checkAppRepeatInYarn(String jobName) {
+  private List<ApplicationReport> getApplicationReports(String jobName) {
     try {
       YarnClient yarnClient = HadoopUtils.yarnClient();
       Set<String> types =
           Sets.newHashSet(
               ApplicationType.STREAMPARK_FLINK.getName(), 
ApplicationType.APACHE_FLINK.getName());
       EnumSet<YarnApplicationState> states =
-          EnumSet.of(YarnApplicationState.RUNNING, 
YarnApplicationState.ACCEPTED);
-      List<ApplicationReport> applications = yarnClient.getApplications(types, 
states);
-      for (ApplicationReport report : applications) {
-        if (report.getName().equals(jobName)) {
-          return true;
-        }
-      }
-      return false;
+          EnumSet.of(
+              YarnApplicationState.NEW,
+              YarnApplicationState.NEW_SAVING,
+              YarnApplicationState.SUBMITTED,
+              YarnApplicationState.ACCEPTED,
+              YarnApplicationState.RUNNING);
+      Set<String> yarnTag = 
Sets.newHashSet(ApplicationType.STREAMPARK_FLINK.getName());
+      List<ApplicationReport> applications = yarnClient.getApplications(types, 
states, yarnTag);
+      return applications.stream()
+          .filter(report -> report.getName().equals(jobName))
+          .collect(Collectors.toList());
     } catch (Exception e) {
       throw new RuntimeException("The yarn api is abnormal. Ensure that yarn 
is running properly.");
     }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 26af8f8ab..1f4fb0ed3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -46,6 +46,7 @@ object YarnApplicationClient extends YarnClientTrait {
   private[this] lazy val workspace = Workspace.remote
 
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    super.setConfig(submitRequest, flinkConfig)
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
       submitRequest.flinkVersion.flinkHome)
     val currentUser = UserGroupInformation.getCurrentUser
@@ -85,10 +86,6 @@ object YarnApplicationClient extends YarnClientTrait {
         PipelineOptions.JARS,
         Collections.singletonList(
           
submitRequest.buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath))
-      // yarn application name
-      .safeSet(YarnConfigOptions.APPLICATION_NAME, 
submitRequest.effectiveAppName)
-      // yarn application Type
-      .safeSet(YarnConfigOptions.APPLICATION_TYPE, 
submitRequest.applicationType.getName)
 
     logInfo(s"""
                
|------------------------------------------------------------------
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 79813adf2..244737d08 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -40,6 +40,7 @@ import scala.collection.JavaConversions._
 object YarnPerJobClient extends YarnClientTrait {
 
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    super.setConfig(submitRequest, flinkConfig)
     // execution.target
     flinkConfig
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 0ef54d5c1..d2a98b740 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -47,6 +47,7 @@ object YarnSessionClient extends YarnClientTrait {
    * @param flinkConfig
    */
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    super.setConfig(submitRequest, flinkConfig)
     flinkConfig
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
     logInfo(s"""
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index fa88df7dd..1a0a24268 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.client.`trait`
 
+import org.apache.streampark.common.enums.ApplicationType
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.bean._
 
@@ -38,6 +39,16 @@ import scala.util.Try
 /** yarn application mode submit */
 trait YarnClientTrait extends FlinkClientTrait {
 
+  override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    // yarn application name
+    flinkConfig
+      .safeSet(YarnConfigOptions.APPLICATION_NAME, 
submitRequest.effectiveAppName)
+      // yarn application Type
+      .safeSet(YarnConfigOptions.APPLICATION_TYPE, 
submitRequest.applicationType.getName)
+      // yarn application Tag
+      .safeSet(YarnConfigOptions.APPLICATION_TAGS, 
ApplicationType.STREAMPARK_FLINK.getName)
+  }
+
   private[this] def executeClientAction[R <: SavepointRequestTrait, O](
       request: R,
       flinkConf: Configuration,

Reply via email to