This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new b6ca3a49f [improve] job submit parameter improvement (#2063)
b6ca3a49f is described below

commit b6ca3a49f144a412d8f6c662e0a5cb12ad732a5a
Author: benjobs <[email protected]>
AuthorDate: Sun Nov 20 21:57:26 2022 +0800

    [improve] job submit parameter improvement (#2063)
---
 .../core/controller/ApplicationController.java     |   9 +-
 .../console/core/entity/FlinkCluster.java          |  16 +++
 .../core/service/impl/AppBuildPipeServiceImpl.java |  11 +--
 .../core/service/impl/ApplicationServiceImpl.java  | 109 +++++++++++----------
 .../core/service/impl/FlinkClusterServiceImpl.java |  13 +--
 .../flink/submit/bean/CancelRequest.scala          |   5 +-
 .../flink/submit/bean/CancelResponse.scala         |   4 +-
 .../flink/submit/bean/DeployRequest.scala          |  10 +-
 .../flink/submit/bean/DeployResponse.scala         |   3 +-
 .../flink/submit/bean/ShutDownRequest.scala        |   3 +-
 .../flink/submit/bean/SubmitRequest.scala          |   6 +-
 .../flink/submit/bean/SubmitResponse.scala         |   4 +-
 .../impl/KubernetesNativeSessionSubmit.scala       |  10 +-
 .../flink/submit/impl/RemoteSubmit.scala           |   4 +-
 .../flink/submit/impl/YarnSessionSubmit.scala      |  12 +--
 .../flink/submit/trait/FlinkSubmitTrait.scala      |  38 ++-----
 16 files changed, 117 insertions(+), 140 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 1ec165574..4505b378d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -142,8 +142,13 @@ public class ApplicationController {
             if (pipeStates.containsKey(e.getId())) {
                 e.setBuildStatus(pipeStates.get(e.getId()).getCode());
             }
-        }).peek(e -> e.setAppControl(new 
AppControl().setAllowBuild(e.getBuildStatus() == null || 
!PipelineStatus.running.getCode().equals(e.getBuildStatus())).setAllowStart(PipelineStatus.success.getCode().equals(e.getBuildStatus())
 && 
!e.shouldBeTrack()).setAllowStop(e.isRunning()))).collect(Collectors.toList());
-
+        }).peek(e -> {
+            AppControl appControl = new AppControl()
+                .setAllowBuild(e.getBuildStatus() == null || 
!PipelineStatus.running.getCode().equals(e.getBuildStatus()))
+                .setAllowStart(!e.shouldBeTrack() && 
PipelineStatus.success.getCode().equals(e.getBuildStatus()))
+                .setAllowStop(e.isRunning());
+            e.setAppControl(appControl);
+        }).collect(Collectors.toList());
         applicationList.setRecords(appRecords);
         return RestResponse.success(applicationList);
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index f1f538123..9ee264226 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -21,8 +21,10 @@ import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
+import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.flink.submit.FlinkSubmitter;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
@@ -33,6 +35,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.Data;
 import lombok.SneakyThrows;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.http.client.config.RequestConfig;
 
 import java.io.Serializable;
@@ -168,4 +171,17 @@ public class FlinkCluster implements Serializable {
         return config;
     }
 
+    @JsonIgnore
+    public Map<String, Object> getProperties() {
+        Map<String, Object> map = new HashMap<>();
+        Map<String, String> dynamicProperties = 
FlinkSubmitter.extractDynamicPropertiesAsJava(this.getDynamicProperties());
+        map.putAll(this.getOptionMap());
+        map.putAll(dynamicProperties);
+        ResolveOrder resolveOrder = ResolveOrder.of(this.getResolveOrder());
+        if (resolveOrder != null) {
+            map.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), 
resolveOrder.getName());
+        }
+        return map;
+    }
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 95a2a09bb..e19e3c371 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -445,16 +445,13 @@ public class AppBuildPipeServiceImpl
             return Maps.newHashMap();
         }
         LambdaQueryWrapper<AppBuildPipeline> queryWrapper = new 
LambdaQueryWrapper<AppBuildPipeline>()
-            .select(AppBuildPipeline::getAppId, 
AppBuildPipeline::getPipeStatusCode)
             .in(AppBuildPipeline::getAppId, appIds);
-        List<Map<String, Object>> rMaps = baseMapper.selectMaps(queryWrapper);
-        if (CollectionUtils.isEmpty(rMaps)) {
+
+        List<AppBuildPipeline> appBuildPipelines = 
baseMapper.selectList(queryWrapper);
+        if (CollectionUtils.isEmpty(appBuildPipelines)) {
             return Maps.newHashMap();
         }
-
-        return rMaps.stream().collect(Collectors.toMap(
-            e -> (Long) e.get("app_id"),
-            e -> PipelineStatus.of((Integer) e.get("pipeStatusCode"))));
+        return appBuildPipelines.stream().collect(Collectors.toMap(e -> 
e.getAppId(), e -> e.getPipelineStatus()));
     }
 
     @Override
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 5d58aaeac..c5bf1388f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -38,6 +38,7 @@ import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.CommonUtils;
@@ -107,6 +108,7 @@ import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -1061,7 +1063,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             }
         }
 
-        Map<String, Object> optionMap = new HashMap<>();
+        Map<String, Object> properties = new HashMap<>();
 
         if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
             FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
@@ -1070,15 +1072,15 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                         "the cluster has been deleted. Please contact the 
Admin.",
                     application.getFlinkClusterId()));
             URI activeAddress = cluster.getActiveAddress();
-            optionMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
-            optionMap.put(RestOptions.PORT.key(), activeAddress.getPort());
+            properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+            properties.put(RestOptions.PORT.key(), activeAddress.getPort());
         } else if 
(ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
             if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
                 FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
                 AssertUtils.state(cluster != null,
                     String.format("The yarn session clusterId=%s cannot be 
find, maybe the clusterId is wrong or " +
                         "the cluster has been deleted. Please contact the 
Admin.", application.getFlinkClusterId()));
-                optionMap.put(ConfigConst.KEY_YARN_APP_ID(), 
cluster.getClusterId());
+                properties.put(ConfigConst.KEY_YARN_APP_ID(), 
cluster.getClusterId());
             }
         }
 
@@ -1101,7 +1103,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             appParam.getDrain(),
             customSavepoint,
             application.getK8sNamespace(),
-            optionMap
+            properties
         );
 
         CompletableFuture<CancelResponse> cancelFuture =
@@ -1222,6 +1224,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
         AssertUtils.state(application != null);
 
+        FlinkEnv flinkEnv = 
flinkEnvService.getByIdOrDefault(application.getVersionId());
+        if (flinkEnv == null) {
+            throw new ApiAlertException("[StreamPark] can no found flink 
version");
+        }
+
         // if manually started, clear the restart flag
         if (!auto) {
             application.setRestartCount(0);
@@ -1282,7 +1289,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             FlinkSql flinkSql = 
flinkSqlService.getEffective(application.getId(), false);
             AssertUtils.state(flinkSql != null);
             // 1) dist_userJar
-            FlinkEnv flinkEnv = 
flinkEnvService.getByIdOrDefault(application.getVersionId());
             String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
             // 2) appConfig
             appConf = applicationConfig == null ? null : 
String.format("yaml://%s", applicationConfig.getContent());
@@ -1295,38 +1301,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             throw new UnsupportedOperationException("Unsupported...");
         }
 
-        Map<String, String> properties = 
FlinkSubmitter.extractDynamicPropertiesAsJava(application.getDynamicProperties());
-
-        Map<String, Object> optionMap = application.getOptionMap();
-
-        if (appParam.getAllowNonRestored()) {
-            
optionMap.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), 
true);
-        }
-
-        if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
-            FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-            AssertUtils.state(cluster != null,
-                String.format("The clusterId=%s cannot be find, maybe the 
clusterId is wrong or " +
-                    "the cluster has been deleted. Please contact the Admin.", 
application.getFlinkClusterId()));
-            URI activeAddress = cluster.getActiveAddress();
-            optionMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
-            optionMap.put(RestOptions.PORT.key(), activeAddress.getPort());
-        } else if 
(ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
-            String yarnQueue = (String) 
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
-            if (yarnQueue != null) {
-                properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
-            }
-            if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-                FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-                AssertUtils.state(cluster != null,
-                    String.format("The yarn session clusterId=%s cannot be 
find, maybe the clusterId is wrong or " +
-                        "the cluster has been deleted. Please contact the 
Admin.", application.getFlinkClusterId()));
-                optionMap.put(ConfigConst.KEY_YARN_APP_ID(), 
cluster.getClusterId());
-            }
-        } else if 
(ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
-            optionMap.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
-        }
-
         Map<String, Object> extraParameter = new HashMap<>(0);
         if (application.isFlinkSqlJob()) {
             FlinkSql flinkSql = 
flinkSqlService.getEffective(application.getId(), true);
@@ -1336,17 +1310,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), 
flinkSql.getSql());
         }
 
-        ResolveOrder resolveOrder = 
ResolveOrder.of(application.getResolveOrder());
-
         KubernetesSubmitParam kubernetesSubmitParam = new 
KubernetesSubmitParam(
             application.getClusterId(),
             application.getK8sNamespace(),
-            application.getK8sRestExposedTypeEnum());
-
-        FlinkEnv flinkEnv = 
flinkEnvService.getByIdOrDefault(application.getVersionId());
-        if (flinkEnv == null) {
-            throw new IllegalArgumentException("[StreamPark] can no found 
flink version");
-        }
+            application.getK8sRestExposedTypeEnum()
+        );
 
         AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
 
@@ -1358,7 +1326,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         } else {
             if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
-                properties.put(JobManagerOptions.ARCHIVE_DIR.key(), 
Workspace.ARCHIVES_FILE_PATH());
                 DockerImageBuildResponse result = 
buildResult.as(DockerImageBuildResponse.class);
                 String ingressTemplates = application.getIngressTemplate();
                 String domainName = application.getDefaultModeIngress();
@@ -1391,7 +1358,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             flinkEnv.getFlinkConf(),
             DevelopmentMode.of(application.getJobType()),
             ExecutionMode.of(application.getExecutionMode()),
-            resolveOrder,
             application.getId(),
             jobId,
             application.getJobName(),
@@ -1399,8 +1365,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             application.getApplicationType(),
             getSavePointed(appParam),
             appParam.getFlameGraph() ? getFlameGraph(application) : null,
-            optionMap,
-            properties,
+            getProperties(application),
             applicationArgs,
             buildResult,
             kubernetesSubmitParam,
@@ -1483,6 +1448,50 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     }
 
+    private Map<String, Object> getProperties(Application application) {
+        Map<String, Object> properties = application.getOptionMap();
+        if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
+            FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+            AssertUtils.state(cluster != null,
+                String.format("The clusterId=%s cannot be find, maybe the 
clusterId is wrong or " +
+                    "the cluster has been deleted. Please contact the Admin.", 
application.getFlinkClusterId()));
+            URI activeAddress = cluster.getActiveAddress();
+            properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+            properties.put(RestOptions.PORT.key(), activeAddress.getPort());
+        } else if 
(ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
+            String yarnQueue = (String) 
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
+            if (yarnQueue != null) {
+                properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
+            }
+            if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+                FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+                AssertUtils.state(cluster != null,
+                    String.format("The yarn session clusterId=%s cannot be 
find, maybe the clusterId is wrong or " +
+                        "the cluster has been deleted. Please contact the 
Admin.", application.getFlinkClusterId()));
+                properties.put(ConfigConst.KEY_YARN_APP_ID(), 
cluster.getClusterId());
+            }
+        } else if 
(ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+            properties.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
+        }
+
+        if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+            properties.put(JobManagerOptions.ARCHIVE_DIR.key(), 
Workspace.ARCHIVES_FILE_PATH());
+        }
+
+        if (application.getAllowNonRestored()) {
+            
properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), 
true);
+        }
+
+        Map<String, String> dynamicProperties = 
FlinkSubmitter.extractDynamicPropertiesAsJava(application.getDynamicProperties());
+        properties.putAll(dynamicProperties);
+        ResolveOrder resolveOrder = 
ResolveOrder.of(application.getResolveOrder());
+        if (resolveOrder != null) {
+            properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), 
resolveOrder.getName());
+        }
+
+        return properties;
+    }
+
     private void updateToStopped(Application app) {
         Application application = getById(app);
         application.setOptionState(OptionState.NONE.getValue());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 68ed6be94..da14fa90a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.core.bean.ResponseResult;
@@ -164,18 +163,13 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
                     return result;
             }
             FlinkEnv flinkEnv = 
flinkEnvService.getById(flinkCluster.getVersionId());
-            Map<String, Object> extraParameter = flinkCluster.getOptionMap();
-            ResolveOrder resolveOrder = 
ResolveOrder.of(flinkCluster.getResolveOrder());
-            Map<String, String> properties = 
FlinkSubmitter.extractDynamicPropertiesAsJava(flinkCluster.getDynamicProperties());
             DeployRequest deployRequest = new DeployRequest(
                 flinkEnv.getFlinkVersion(),
                 flinkCluster.getClusterId(),
                 executionModeEnum,
-                resolveOrder,
                 flinkCluster.getFlameGraph() ? getFlameGraph(flinkCluster) : 
null,
-                properties,
-                kubernetesDeployParam,
-                extraParameter
+                flinkCluster.getProperties(),
+                kubernetesDeployParam
             );
             log.info("deploy cluster request " + deployRequest);
             Future<DeployResponse> future = executorService.submit(() -> 
FlinkSubmitter.deploy(deployRequest));
@@ -251,13 +245,12 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
             return result;
         }
         FlinkEnv flinkEnv = 
flinkEnvService.getById(flinkCluster.getVersionId());
-        Map<String, Object> extraParameter = flinkCluster.getOptionMap();
         ShutDownRequest stopRequest = new ShutDownRequest(
             flinkEnv.getFlinkVersion(),
             executionModeEnum,
             clusterId,
             kubernetesDeployParam,
-            extraParameter
+            flinkCluster.getProperties()
         );
         LambdaUpdateWrapper<FlinkCluster> updateWrapper = 
Wrappers.lambdaUpdate();
         updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
index 902d3d137..8fd5c6db2 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
@@ -32,7 +32,4 @@ case class CancelRequest(flinkVersion: FlinkVersion,
                          withDrain: Boolean,
                          customSavePointPath: String,
                          kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
-                         @Nullable option: JavaMap[String, Any]
-                      ) {
-
-}
+                         @Nullable properties: JavaMap[String, Any])
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelResponse.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelResponse.scala
index 382523918..bdeb35d99 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelResponse.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelResponse.scala
@@ -17,6 +17,4 @@
 
 package org.apache.streampark.flink.submit.bean
 
-case class CancelResponse(savePointDir: String) {
-
-}
+case class CancelResponse(savePointDir: String)
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
index 9de5748fc..278d46758 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
@@ -23,7 +23,7 @@ import java.util.{Map => JavaMap}
 import org.apache.streampark.common.conf.Workspace
 import javax.annotation.Nullable
 import org.apache.streampark.common.domain.FlinkVersion
-import org.apache.streampark.common.enums.{ExecutionMode, 
FlinkK8sRestExposedType, ResolveOrder}
+import org.apache.streampark.common.enums.{ExecutionMode, 
FlinkK8sRestExposedType}
 import org.apache.streampark.common.util.FlinkUtils
 import org.apache.commons.io.FileUtils
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
@@ -31,12 +31,10 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 case class DeployRequest(flinkVersion: FlinkVersion,
                          clusterId: String,
                          executionMode: ExecutionMode,
-                         resolveOrder: ResolveOrder,
                          flameGraph: JavaMap[String, java.io.Serializable],
-                         properties: JavaMap[String, String],
-                         @Nullable k8sDeployParam: KubernetesDeployParam,
-                         @Nullable extraParameter: JavaMap[String, Any]
-                         ) {
+                         properties: JavaMap[String, Any],
+                         @Nullable k8sDeployParam: KubernetesDeployParam) {
+
   private[submit] lazy val hdfsWorkspace = {
     /**
       * You must keep the flink version and configuration in the native flink 
and hdfs exactly the same.
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
index 7e2cb266f..f352732da 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
@@ -19,5 +19,4 @@ package org.apache.streampark.flink.submit.bean
 
 case class DeployResponse(address: String,
                           clusterId: String,
-                          message: String = null
-                         )
+                          message: String = null)
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/ShutDownRequest.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/ShutDownRequest.scala
index 4c1464b8b..0112541f1 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/ShutDownRequest.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/ShutDownRequest.scala
@@ -26,5 +26,4 @@ case class ShutDownRequest(flinkVersion: FlinkVersion,
                             executionMode: ExecutionMode,
                             clusterId: String,
                             @Nullable kubernetesDeployParam: 
KubernetesDeployParam,
-                            @Nullable extraParameter: JavaMap[String, Any]
-                          )
+                            @Nullable properties: JavaMap[String, Any])
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index f46cc6782..a39840247 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -46,7 +46,6 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
                          flinkYaml: String,
                          developmentMode: DevelopmentMode,
                          executionMode: ExecutionMode,
-                         resolveOrder: ResolveOrder,
                          id: Long,
                          jobId: String,
                          appName: String,
@@ -54,8 +53,7 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
                          applicationType: ApplicationType,
                          savePoint: String,
                          flameGraph: JavaMap[String, java.io.Serializable],
-                         option: JavaMap[String, Any],
-                         properties: JavaMap[String, String],
+                         properties: JavaMap[String, Any],
                          args: String,
                          @Nullable buildResult: BuildResult,
                          @Nullable k8sSubmitParam: KubernetesSubmitParam,
@@ -74,7 +72,7 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
 
-  lazy val allowNonRestoredState = 
Try(option.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
+  lazy val allowNonRestoredState = 
Try(properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
 
   lazy val savepointRestoreSettings: SavepointRestoreSettings = {
     savePoint match {
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
index 5ce981a63..07e36e080 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
@@ -23,6 +23,4 @@ import javax.annotation.Nullable
 case class SubmitResponse(clusterId: String,
                           flinkConfig: JavaMap[String, String],
                           @Nullable jobId: String = "",
-                          @Nullable jobManagerUrl: String = "") {
-
-}
+                          @Nullable jobManagerUrl: String = "")
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
index 30fa4562b..9e91914ff 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
@@ -130,7 +130,6 @@ object KubernetesNativeSessionSubmit extends 
KubernetesNativeSubmitTrait with Lo
          |    exposedType      : 
${deployRequest.k8sDeployParam.flinkRestExposedType}
          |    serviceAccount   : ${deployRequest.k8sDeployParam.serviceAccount}
          |    flinkImage       : ${deployRequest.k8sDeployParam.flinkImage}
-         |    resolveOrder     : ${deployRequest.resolveOrder.getName}
          |    flameGraph       : ${deployRequest.flameGraph != null}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          
|-------------------------------------------------------------------------------------------
@@ -139,12 +138,7 @@ object KubernetesNativeSessionSubmit extends 
KubernetesNativeSubmitTrait with Lo
     var client: ClusterClient[String] = null
     var kubeClient: FlinkKubeClient = null
     try {
-      val flinkConfig = extractConfiguration(
-        deployRequest.flinkVersion.flinkHome,
-        deployRequest.properties,
-        deployRequest.extraParameter,
-        deployRequest.resolveOrder)
-
+      val flinkConfig = 
extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
       flinkConfig
         .safeSet(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName)
         .safeSet(KubernetesConfigOptions.NAMESPACE, 
deployRequest.k8sDeployParam.kubernetesNamespace)
@@ -181,7 +175,7 @@ object KubernetesNativeSessionSubmit extends 
KubernetesNativeSubmitTrait with Lo
     var kubeClient: FlinkKubeClient = null
     try {
       val flinkConfig = 
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
-      shutDownRequest.extraParameter.foreach(m => m._2 match {
+      shutDownRequest.properties.foreach(m => m._2 match {
         case v if v != null => flinkConfig.setString(m._1, m._2.toString)
         case _ =>
       })
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
index 5d4aab09c..7c87a4ae7 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
@@ -52,8 +52,8 @@ object RemoteSubmit extends FlinkSubmitTrait {
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
     flinkConfig
       .safeSet(DeploymentOptions.TARGET, cancelRequest.executionMode.getName)
-      .safeSet(RestOptions.ADDRESS, 
cancelRequest.option.get(RestOptions.ADDRESS.key()).toString)
-      .safeSet[JavaInt](RestOptions.PORT, 
cancelRequest.option.get(RestOptions.PORT.key()).toString.toInt)
+      .safeSet(RestOptions.ADDRESS, 
cancelRequest.properties.get(RestOptions.ADDRESS.key()).toString)
+      .safeSet[JavaInt](RestOptions.PORT, 
cancelRequest.properties.get(RestOptions.PORT.key()).toString.toInt)
     logInfo(
       s"""
          |------------------------------------------------------------------
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 610990dc0..21b216e77 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -137,7 +137,7 @@ object YarnSessionSubmit extends YarnSubmitTrait {
   }
 
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
-    flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, 
cancelRequest.option.get(KEY_YARN_APP_ID).toString)
+    flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, 
cancelRequest.properties.get(KEY_YARN_APP_ID).toString)
     flinkConfig.safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.SESSION.getName)
     logInfo(
       s"""
@@ -172,7 +172,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
          |    flinkVersion     : ${deployRequest.flinkVersion.version}
          |    execMode         : ${deployRequest.executionMode.name()}
          |    clusterId        : ${deployRequest.clusterId}
-         |    resolveOrder     : ${deployRequest.resolveOrder.getName}
          |    flameGraph       : ${deployRequest.flameGraph != null}
          |    properties       : ${deployRequest.properties.mkString(" ")}
          
|-------------------------------------------------------------------------------------------
@@ -180,10 +179,9 @@ object YarnSessionSubmit extends YarnSubmitTrait {
     var clusterDescriptor: YarnClusterDescriptor = null
     var client: ClusterClient[ApplicationId] = null
     try {
-      val flinkConfig = 
extractConfiguration(deployRequest.flinkVersion.flinkHome,
-        deployRequest.properties,
-        deployRequest.extraParameter,
-        deployRequest.resolveOrder)
+      val flinkConfig = extractConfiguration(
+        deployRequest.flinkVersion.flinkHome,
+        deployRequest.properties)
       setConfig(deployRequest, flinkConfig)
       val yarnClusterDescriptor = 
getSessionClusterDeployDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
@@ -222,7 +220,7 @@ object YarnSessionSubmit extends YarnSubmitTrait {
     var client: ClusterClient[ApplicationId] = null
     try {
       val flinkConfig = 
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
-      shutDownRequest.extraParameter.foreach(m => m._2 match {
+      shutDownRequest.properties.foreach(m => m._2 match {
         case v if v != null => flinkConfig.setString(m._1, m._2.toString)
         case _ =>
       })
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 414959668..c3f7735f5 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -68,11 +68,9 @@ trait FlinkSubmitTrait extends Logger {
          |    k8sNamespace     : 
${submitRequest.k8sSubmitParam.kubernetesNamespace}
          |    flinkExposedType : 
${submitRequest.k8sSubmitParam.flinkRestExposedType}
          |    clusterId        : ${submitRequest.k8sSubmitParam.clusterId}
-         |    resolveOrder     : ${submitRequest.resolveOrder.getName}
          |    applicationType  : ${submitRequest.applicationType.getName}
          |    flameGraph       : ${submitRequest.flameGraph != null}
          |    savePoint        : ${submitRequest.savePoint}
-         |    option           : ${submitRequest.option}
          |    properties       : ${submitRequest.properties.mkString(" ")}
          |    args             : ${submitRequest.args}
          |    appConf          : ${submitRequest.appConf}
@@ -93,7 +91,6 @@ trait FlinkSubmitTrait extends Logger {
       .safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
       .safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
       .safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
-      .safeSet(CoreOptions.CLASSLOADER_RESOLVE_ORDER, 
submitRequest.resolveOrder.getName)
       .safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, 
submitRequest.appMain)
       .safeSet(ApplicationConfiguration.APPLICATION_ARGS, 
extractProgramArgs(submitRequest))
       .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
submitRequest.jobId)
@@ -230,8 +227,8 @@ trait FlinkSubmitTrait extends Logger {
   }
 
   private[submit] def getParallelism(submitRequest: SubmitRequest): Integer = {
-    if (submitRequest.option.containsKey(KEY_FLINK_PARALLELISM())) {
-      
Integer.valueOf(submitRequest.option.get(KEY_FLINK_PARALLELISM()).toString)
+    if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
+      
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
     } else {
       
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome).getInteger(
         CoreOptions.DEFAULT_PARALLELISM,
@@ -287,20 +284,10 @@ trait FlinkSubmitTrait extends Logger {
         array += 
s"-D${CoreOptions.FLINK_TM_JVM_OPTIONS.key()}=-javaagent:$$PWD/plugins/$jvmProfilerJar=$param"
       }
 
-      // The priority of the parameters defined on the page is greater than 
the app conf file, property parameters etc.
-      if (MapUtils.isNotEmpty(submitRequest.option)) {
-        submitRequest.option.foreach(x => array += 
s"-D${x._1.trim}=${x._2.toString.trim}")
+      // app properties
+      if (MapUtils.isNotEmpty(submitRequest.properties)) {
+        submitRequest.properties.foreach(x => array += s"-D${x._1}=${x._2}")
       }
-
-      //-D other dynamic parameter
-      if (submitRequest.properties != null && 
submitRequest.properties.nonEmpty) {
-        submitRequest.properties
-          .filter(_._1 != "classloader.resolve-order")
-          .foreach(x => array += s"-D${x._1}=${x._2}")
-      }
-
-      array += 
s"-Dclassloader.resolve-order=${submitRequest.resolveOrder.getName}"
-
       array.toArray
     }
 
@@ -328,25 +315,16 @@ trait FlinkSubmitTrait extends Logger {
     FlinkRunOption.mergeOptions(CliFrontendParser.getRunCommandOptions, 
customCommandLineOptions)
   }
 
-  private[submit] def extractConfiguration(flinkHome: String,
-                                           properties: JavaMap[String, String],
-                                           extraParameter: JavaMap[String, 
Any],
-                                           resolveOrder: ResolveOrder): 
Configuration = {
+  private[submit] def extractConfiguration(flinkHome: String, properties: 
JavaMap[String, Any]): Configuration = {
     val commandLine = {
       val commandLineOptions = getCommandLineOptions(flinkHome)
       //read and verify user config...
       val cliArgs = {
         val array = new ArrayBuffer[String]()
         // The priority of the parameters defined on the page is greater than 
the app conf file, property parameters etc.
-        if (MapUtils.isNotEmpty(extraParameter)) {
-          extraParameter.foreach(x => array += 
s"-D${x._1.trim}=${x._2.toString.trim}")
-        }
-        if (properties != null && properties.nonEmpty) {
-          properties
-            .filter(_._1 != "classloader.resolve-order")
-            .foreach(x => array += s"-D${x._1}=${x._2}")
+        if (MapUtils.isNotEmpty(properties)) {
+          properties.foreach(x => array += s"-D${x._1}=${x._2.toString.trim}")
         }
-        array += s"-Dclassloader.resolve-order=${resolveOrder.getName}"
         array.toArray
       }
       FlinkRunOption.parse(commandLineOptions, cliArgs, true)


Reply via email to