This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b6ca3a49f [improve] job submit parameter improvement (#2063)
b6ca3a49f is described below
commit b6ca3a49f144a412d8f6c662e0a5cb12ad732a5a
Author: benjobs <[email protected]>
AuthorDate: Sun Nov 20 21:57:26 2022 +0800
[improve] job submit parameter improvement (#2063)
---
.../core/controller/ApplicationController.java | 9 +-
.../console/core/entity/FlinkCluster.java | 16 +++
.../core/service/impl/AppBuildPipeServiceImpl.java | 11 +--
.../core/service/impl/ApplicationServiceImpl.java | 109 +++++++++++----------
.../core/service/impl/FlinkClusterServiceImpl.java | 13 +--
.../flink/submit/bean/CancelRequest.scala | 5 +-
.../flink/submit/bean/CancelResponse.scala | 4 +-
.../flink/submit/bean/DeployRequest.scala | 10 +-
.../flink/submit/bean/DeployResponse.scala | 3 +-
.../flink/submit/bean/ShutDownRequest.scala | 3 +-
.../flink/submit/bean/SubmitRequest.scala | 6 +-
.../flink/submit/bean/SubmitResponse.scala | 4 +-
.../impl/KubernetesNativeSessionSubmit.scala | 10 +-
.../flink/submit/impl/RemoteSubmit.scala | 4 +-
.../flink/submit/impl/YarnSessionSubmit.scala | 12 +--
.../flink/submit/trait/FlinkSubmitTrait.scala | 38 ++-----
16 files changed, 117 insertions(+), 140 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 1ec165574..4505b378d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -142,8 +142,13 @@ public class ApplicationController {
if (pipeStates.containsKey(e.getId())) {
e.setBuildStatus(pipeStates.get(e.getId()).getCode());
}
- }).peek(e -> e.setAppControl(new
AppControl().setAllowBuild(e.getBuildStatus() == null ||
!PipelineStatus.running.getCode().equals(e.getBuildStatus())).setAllowStart(PipelineStatus.success.getCode().equals(e.getBuildStatus())
&&
!e.shouldBeTrack()).setAllowStop(e.isRunning()))).collect(Collectors.toList());
-
+ }).peek(e -> {
+ AppControl appControl = new AppControl()
+ .setAllowBuild(e.getBuildStatus() == null ||
!PipelineStatus.running.getCode().equals(e.getBuildStatus()))
+ .setAllowStart(!e.shouldBeTrack() &&
PipelineStatus.success.getCode().equals(e.getBuildStatus()))
+ .setAllowStop(e.isRunning());
+ e.setAppControl(appControl);
+ }).collect(Collectors.toList());
applicationList.setRecords(appRecords);
return RestResponse.success(applicationList);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index f1f538123..9ee264226 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -21,8 +21,10 @@ import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
+import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.flink.submit.FlinkSubmitter;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
@@ -33,6 +35,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.Data;
import lombok.SneakyThrows;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.http.client.config.RequestConfig;
import java.io.Serializable;
@@ -168,4 +171,17 @@ public class FlinkCluster implements Serializable {
return config;
}
+ @JsonIgnore
+ public Map<String, Object> getProperties() {
+ Map<String, Object> map = new HashMap<>();
+ Map<String, String> dynamicProperties =
FlinkSubmitter.extractDynamicPropertiesAsJava(this.getDynamicProperties());
+ map.putAll(this.getOptionMap());
+ map.putAll(dynamicProperties);
+ ResolveOrder resolveOrder = ResolveOrder.of(this.getResolveOrder());
+ if (resolveOrder != null) {
+ map.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(),
resolveOrder.getName());
+ }
+ return map;
+ }
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 95a2a09bb..e19e3c371 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -445,16 +445,13 @@ public class AppBuildPipeServiceImpl
return Maps.newHashMap();
}
LambdaQueryWrapper<AppBuildPipeline> queryWrapper = new
LambdaQueryWrapper<AppBuildPipeline>()
- .select(AppBuildPipeline::getAppId,
AppBuildPipeline::getPipeStatusCode)
.in(AppBuildPipeline::getAppId, appIds);
- List<Map<String, Object>> rMaps = baseMapper.selectMaps(queryWrapper);
- if (CollectionUtils.isEmpty(rMaps)) {
+
+ List<AppBuildPipeline> appBuildPipelines =
baseMapper.selectList(queryWrapper);
+ if (CollectionUtils.isEmpty(appBuildPipelines)) {
return Maps.newHashMap();
}
-
- return rMaps.stream().collect(Collectors.toMap(
- e -> (Long) e.get("app_id"),
- e -> PipelineStatus.of((Integer) e.get("pipeStatusCode"))));
+ return appBuildPipelines.stream().collect(Collectors.toMap(e ->
e.getAppId(), e -> e.getPipelineStatus()));
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 5d58aaeac..c5bf1388f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -38,6 +38,7 @@ import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.CommonUtils;
@@ -107,6 +108,7 @@ import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -1061,7 +1063,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
- Map<String, Object> optionMap = new HashMap<>();
+ Map<String, Object> properties = new HashMap<>();
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
@@ -1070,15 +1072,15 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
"the cluster has been deleted. Please contact the
Admin.",
application.getFlinkClusterId()));
URI activeAddress = cluster.getActiveAddress();
- optionMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
- optionMap.put(RestOptions.PORT.key(), activeAddress.getPort());
+ properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+ properties.put(RestOptions.PORT.key(), activeAddress.getPort());
} else if
(ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
AssertUtils.state(cluster != null,
String.format("The yarn session clusterId=%s cannot be
find, maybe the clusterId is wrong or " +
"the cluster has been deleted. Please contact the
Admin.", application.getFlinkClusterId()));
- optionMap.put(ConfigConst.KEY_YARN_APP_ID(),
cluster.getClusterId());
+ properties.put(ConfigConst.KEY_YARN_APP_ID(),
cluster.getClusterId());
}
}
@@ -1101,7 +1103,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
appParam.getDrain(),
customSavepoint,
application.getK8sNamespace(),
- optionMap
+ properties
);
CompletableFuture<CancelResponse> cancelFuture =
@@ -1222,6 +1224,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
AssertUtils.state(application != null);
+ FlinkEnv flinkEnv =
flinkEnvService.getByIdOrDefault(application.getVersionId());
+ if (flinkEnv == null) {
+ throw new ApiAlertException("[StreamPark] can no found flink
version");
+ }
+
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
@@ -1282,7 +1289,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), false);
AssertUtils.state(flinkSql != null);
// 1) dist_userJar
- FlinkEnv flinkEnv =
flinkEnvService.getByIdOrDefault(application.getVersionId());
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
// 2) appConfig
appConf = applicationConfig == null ? null :
String.format("yaml://%s", applicationConfig.getContent());
@@ -1295,38 +1301,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
throw new UnsupportedOperationException("Unsupported...");
}
- Map<String, String> properties =
FlinkSubmitter.extractDynamicPropertiesAsJava(application.getDynamicProperties());
-
- Map<String, Object> optionMap = application.getOptionMap();
-
- if (appParam.getAllowNonRestored()) {
-
optionMap.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(),
true);
- }
-
- if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
- FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
- AssertUtils.state(cluster != null,
- String.format("The clusterId=%s cannot be find, maybe the
clusterId is wrong or " +
- "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
- URI activeAddress = cluster.getActiveAddress();
- optionMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
- optionMap.put(RestOptions.PORT.key(), activeAddress.getPort());
- } else if
(ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
- String yarnQueue = (String)
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
- if (yarnQueue != null) {
- properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
- }
- if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
- FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
- AssertUtils.state(cluster != null,
- String.format("The yarn session clusterId=%s cannot be
find, maybe the clusterId is wrong or " +
- "the cluster has been deleted. Please contact the
Admin.", application.getFlinkClusterId()));
- optionMap.put(ConfigConst.KEY_YARN_APP_ID(),
cluster.getClusterId());
- }
- } else if
(ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
- optionMap.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
- }
-
Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), true);
@@ -1336,17 +1310,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null),
flinkSql.getSql());
}
- ResolveOrder resolveOrder =
ResolveOrder.of(application.getResolveOrder());
-
KubernetesSubmitParam kubernetesSubmitParam = new
KubernetesSubmitParam(
application.getClusterId(),
application.getK8sNamespace(),
- application.getK8sRestExposedTypeEnum());
-
- FlinkEnv flinkEnv =
flinkEnvService.getByIdOrDefault(application.getVersionId());
- if (flinkEnv == null) {
- throw new IllegalArgumentException("[StreamPark] can no found
flink version");
- }
+ application.getK8sRestExposedTypeEnum()
+ );
AppBuildPipeline buildPipeline =
appBuildPipeService.getById(application.getId());
@@ -1358,7 +1326,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
} else {
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
AssertUtils.state(buildResult != null);
- properties.put(JobManagerOptions.ARCHIVE_DIR.key(),
Workspace.ARCHIVES_FILE_PATH());
DockerImageBuildResponse result =
buildResult.as(DockerImageBuildResponse.class);
String ingressTemplates = application.getIngressTemplate();
String domainName = application.getDefaultModeIngress();
@@ -1391,7 +1358,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
flinkEnv.getFlinkConf(),
DevelopmentMode.of(application.getJobType()),
ExecutionMode.of(application.getExecutionMode()),
- resolveOrder,
application.getId(),
jobId,
application.getJobName(),
@@ -1399,8 +1365,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.getApplicationType(),
getSavePointed(appParam),
appParam.getFlameGraph() ? getFlameGraph(application) : null,
- optionMap,
- properties,
+ getProperties(application),
applicationArgs,
buildResult,
kubernetesSubmitParam,
@@ -1483,6 +1448,50 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
+ private Map<String, Object> getProperties(Application application) {
+ Map<String, Object> properties = application.getOptionMap();
+ if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
+ FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ AssertUtils.state(cluster != null,
+ String.format("The clusterId=%s cannot be find, maybe the
clusterId is wrong or " +
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
+ URI activeAddress = cluster.getActiveAddress();
+ properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
+ properties.put(RestOptions.PORT.key(), activeAddress.getPort());
+ } else if
(ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
+ String yarnQueue = (String)
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
+ if (yarnQueue != null) {
+ properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
+ }
+ if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+ FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ AssertUtils.state(cluster != null,
+ String.format("The yarn session clusterId=%s cannot be
find, maybe the clusterId is wrong or " +
+ "the cluster has been deleted. Please contact the
Admin.", application.getFlinkClusterId()));
+ properties.put(ConfigConst.KEY_YARN_APP_ID(),
cluster.getClusterId());
+ }
+ } else if
(ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+ properties.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
+ }
+
+ if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+ properties.put(JobManagerOptions.ARCHIVE_DIR.key(),
Workspace.ARCHIVES_FILE_PATH());
+ }
+
+ if (application.getAllowNonRestored()) {
+
properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(),
true);
+ }
+
+ Map<String, String> dynamicProperties =
FlinkSubmitter.extractDynamicPropertiesAsJava(application.getDynamicProperties());
+ properties.putAll(dynamicProperties);
+ ResolveOrder resolveOrder =
ResolveOrder.of(application.getResolveOrder());
+ if (resolveOrder != null) {
+ properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(),
resolveOrder.getName());
+ }
+
+ return properties;
+ }
+
private void updateToStopped(Application app) {
Application application = getById(app);
application.setOptionState(OptionState.NONE.getValue());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 68ed6be94..da14fa90a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.core.bean.ResponseResult;
@@ -164,18 +163,13 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
return result;
}
FlinkEnv flinkEnv =
flinkEnvService.getById(flinkCluster.getVersionId());
- Map<String, Object> extraParameter = flinkCluster.getOptionMap();
- ResolveOrder resolveOrder =
ResolveOrder.of(flinkCluster.getResolveOrder());
- Map<String, String> properties =
FlinkSubmitter.extractDynamicPropertiesAsJava(flinkCluster.getDynamicProperties());
DeployRequest deployRequest = new DeployRequest(
flinkEnv.getFlinkVersion(),
flinkCluster.getClusterId(),
executionModeEnum,
- resolveOrder,
flinkCluster.getFlameGraph() ? getFlameGraph(flinkCluster) :
null,
- properties,
- kubernetesDeployParam,
- extraParameter
+ flinkCluster.getProperties(),
+ kubernetesDeployParam
);
log.info("deploy cluster request " + deployRequest);
Future<DeployResponse> future = executorService.submit(() ->
FlinkSubmitter.deploy(deployRequest));
@@ -251,13 +245,12 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
return result;
}
FlinkEnv flinkEnv =
flinkEnvService.getById(flinkCluster.getVersionId());
- Map<String, Object> extraParameter = flinkCluster.getOptionMap();
ShutDownRequest stopRequest = new ShutDownRequest(
flinkEnv.getFlinkVersion(),
executionModeEnum,
clusterId,
kubernetesDeployParam,
- extraParameter
+ flinkCluster.getProperties()
);
LambdaUpdateWrapper<FlinkCluster> updateWrapper =
Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkCluster::getId, flinkCluster.getId());
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
index 902d3d137..8fd5c6db2 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelRequest.scala
@@ -32,7 +32,4 @@ case class CancelRequest(flinkVersion: FlinkVersion,
withDrain: Boolean,
customSavePointPath: String,
kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
- @Nullable option: JavaMap[String, Any]
- ) {
-
-}
+ @Nullable properties: JavaMap[String, Any])
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelResponse.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelResponse.scala
index 382523918..bdeb35d99 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelResponse.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/CancelResponse.scala
@@ -17,6 +17,4 @@
package org.apache.streampark.flink.submit.bean
-case class CancelResponse(savePointDir: String) {
-
-}
+case class CancelResponse(savePointDir: String)
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
index 9de5748fc..278d46758 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployRequest.scala
@@ -23,7 +23,7 @@ import java.util.{Map => JavaMap}
import org.apache.streampark.common.conf.Workspace
import javax.annotation.Nullable
import org.apache.streampark.common.domain.FlinkVersion
-import org.apache.streampark.common.enums.{ExecutionMode,
FlinkK8sRestExposedType, ResolveOrder}
+import org.apache.streampark.common.enums.{ExecutionMode,
FlinkK8sRestExposedType}
import org.apache.streampark.common.util.FlinkUtils
import org.apache.commons.io.FileUtils
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
@@ -31,12 +31,10 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
case class DeployRequest(flinkVersion: FlinkVersion,
clusterId: String,
executionMode: ExecutionMode,
- resolveOrder: ResolveOrder,
flameGraph: JavaMap[String, java.io.Serializable],
- properties: JavaMap[String, String],
- @Nullable k8sDeployParam: KubernetesDeployParam,
- @Nullable extraParameter: JavaMap[String, Any]
- ) {
+ properties: JavaMap[String, Any],
+ @Nullable k8sDeployParam: KubernetesDeployParam) {
+
private[submit] lazy val hdfsWorkspace = {
/**
* You must keep the flink version and configuration in the native flink
and hdfs exactly the same.
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
index 7e2cb266f..f352732da 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/DeployResponse.scala
@@ -19,5 +19,4 @@ package org.apache.streampark.flink.submit.bean
case class DeployResponse(address: String,
clusterId: String,
- message: String = null
- )
+ message: String = null)
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/ShutDownRequest.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/ShutDownRequest.scala
index 4c1464b8b..0112541f1 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/ShutDownRequest.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/ShutDownRequest.scala
@@ -26,5 +26,4 @@ case class ShutDownRequest(flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
clusterId: String,
@Nullable kubernetesDeployParam:
KubernetesDeployParam,
- @Nullable extraParameter: JavaMap[String, Any]
- )
+ @Nullable properties: JavaMap[String, Any])
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index f46cc6782..a39840247 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -46,7 +46,6 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
flinkYaml: String,
developmentMode: DevelopmentMode,
executionMode: ExecutionMode,
- resolveOrder: ResolveOrder,
id: Long,
jobId: String,
appName: String,
@@ -54,8 +53,7 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
applicationType: ApplicationType,
savePoint: String,
flameGraph: JavaMap[String, java.io.Serializable],
- option: JavaMap[String, Any],
- properties: JavaMap[String, String],
+ properties: JavaMap[String, Any],
args: String,
@Nullable buildResult: BuildResult,
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@@ -74,7 +72,7 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
- lazy val allowNonRestoredState =
Try(option.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
+ lazy val allowNonRestoredState =
Try(properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
lazy val savepointRestoreSettings: SavepointRestoreSettings = {
savePoint match {
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
index 5ce981a63..07e36e080 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
@@ -23,6 +23,4 @@ import javax.annotation.Nullable
case class SubmitResponse(clusterId: String,
flinkConfig: JavaMap[String, String],
@Nullable jobId: String = "",
- @Nullable jobManagerUrl: String = "") {
-
-}
+ @Nullable jobManagerUrl: String = "")
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
index 30fa4562b..9e91914ff 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala
@@ -130,7 +130,6 @@ object KubernetesNativeSessionSubmit extends
KubernetesNativeSubmitTrait with Lo
| exposedType :
${deployRequest.k8sDeployParam.flinkRestExposedType}
| serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
| flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
- | resolveOrder : ${deployRequest.resolveOrder.getName}
| flameGraph : ${deployRequest.flameGraph != null}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
@@ -139,12 +138,7 @@ object KubernetesNativeSessionSubmit extends
KubernetesNativeSubmitTrait with Lo
var client: ClusterClient[String] = null
var kubeClient: FlinkKubeClient = null
try {
- val flinkConfig = extractConfiguration(
- deployRequest.flinkVersion.flinkHome,
- deployRequest.properties,
- deployRequest.extraParameter,
- deployRequest.resolveOrder)
-
+ val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
flinkConfig
.safeSet(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.SESSION.getName)
.safeSet(KubernetesConfigOptions.NAMESPACE,
deployRequest.k8sDeployParam.kubernetesNamespace)
@@ -181,7 +175,7 @@ object KubernetesNativeSessionSubmit extends
KubernetesNativeSubmitTrait with Lo
var kubeClient: FlinkKubeClient = null
try {
val flinkConfig =
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
- shutDownRequest.extraParameter.foreach(m => m._2 match {
+ shutDownRequest.properties.foreach(m => m._2 match {
case v if v != null => flinkConfig.setString(m._1, m._2.toString)
case _ =>
})
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
index 5d4aab09c..7c87a4ae7 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/RemoteSubmit.scala
@@ -52,8 +52,8 @@ object RemoteSubmit extends FlinkSubmitTrait {
override def doCancel(cancelRequest: CancelRequest, flinkConfig:
Configuration): CancelResponse = {
flinkConfig
.safeSet(DeploymentOptions.TARGET, cancelRequest.executionMode.getName)
- .safeSet(RestOptions.ADDRESS,
cancelRequest.option.get(RestOptions.ADDRESS.key()).toString)
- .safeSet[JavaInt](RestOptions.PORT,
cancelRequest.option.get(RestOptions.PORT.key()).toString.toInt)
+ .safeSet(RestOptions.ADDRESS,
cancelRequest.properties.get(RestOptions.ADDRESS.key()).toString)
+ .safeSet[JavaInt](RestOptions.PORT,
cancelRequest.properties.get(RestOptions.PORT.key()).toString.toInt)
logInfo(
s"""
|------------------------------------------------------------------
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
index 610990dc0..21b216e77 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnSessionSubmit.scala
@@ -137,7 +137,7 @@ object YarnSessionSubmit extends YarnSubmitTrait {
}
override def doCancel(cancelRequest: CancelRequest, flinkConfig:
Configuration): CancelResponse = {
- flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID,
cancelRequest.option.get(KEY_YARN_APP_ID).toString)
+ flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID,
cancelRequest.properties.get(KEY_YARN_APP_ID).toString)
flinkConfig.safeSet(DeploymentOptions.TARGET,
YarnDeploymentTarget.SESSION.getName)
logInfo(
s"""
@@ -172,7 +172,6 @@ object YarnSessionSubmit extends YarnSubmitTrait {
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
- | resolveOrder : ${deployRequest.resolveOrder.getName}
| flameGraph : ${deployRequest.flameGraph != null}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
@@ -180,10 +179,9 @@ object YarnSessionSubmit extends YarnSubmitTrait {
var clusterDescriptor: YarnClusterDescriptor = null
var client: ClusterClient[ApplicationId] = null
try {
- val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome,
- deployRequest.properties,
- deployRequest.extraParameter,
- deployRequest.resolveOrder)
+ val flinkConfig = extractConfiguration(
+ deployRequest.flinkVersion.flinkHome,
+ deployRequest.properties)
setConfig(deployRequest, flinkConfig)
val yarnClusterDescriptor =
getSessionClusterDeployDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
@@ -222,7 +220,7 @@ object YarnSessionSubmit extends YarnSubmitTrait {
var client: ClusterClient[ApplicationId] = null
try {
val flinkConfig =
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
- shutDownRequest.extraParameter.foreach(m => m._2 match {
+ shutDownRequest.properties.foreach(m => m._2 match {
case v if v != null => flinkConfig.setString(m._1, m._2.toString)
case _ =>
})
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 414959668..c3f7735f5 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -68,11 +68,9 @@ trait FlinkSubmitTrait extends Logger {
| k8sNamespace :
${submitRequest.k8sSubmitParam.kubernetesNamespace}
| flinkExposedType :
${submitRequest.k8sSubmitParam.flinkRestExposedType}
| clusterId : ${submitRequest.k8sSubmitParam.clusterId}
- | resolveOrder : ${submitRequest.resolveOrder.getName}
| applicationType : ${submitRequest.applicationType.getName}
| flameGraph : ${submitRequest.flameGraph != null}
| savePoint : ${submitRequest.savePoint}
- | option : ${submitRequest.option}
| properties : ${submitRequest.properties.mkString(" ")}
| args : ${submitRequest.args}
| appConf : ${submitRequest.appConf}
@@ -93,7 +91,6 @@ trait FlinkSubmitTrait extends Logger {
.safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
.safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
- .safeSet(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
submitRequest.resolveOrder.getName)
.safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS,
submitRequest.appMain)
.safeSet(ApplicationConfiguration.APPLICATION_ARGS,
extractProgramArgs(submitRequest))
.safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
submitRequest.jobId)
@@ -230,8 +227,8 @@ trait FlinkSubmitTrait extends Logger {
}
private[submit] def getParallelism(submitRequest: SubmitRequest): Integer = {
- if (submitRequest.option.containsKey(KEY_FLINK_PARALLELISM())) {
-
Integer.valueOf(submitRequest.option.get(KEY_FLINK_PARALLELISM()).toString)
+ if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
+
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
} else {
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome).getInteger(
CoreOptions.DEFAULT_PARALLELISM,
@@ -287,20 +284,10 @@ trait FlinkSubmitTrait extends Logger {
array +=
s"-D${CoreOptions.FLINK_TM_JVM_OPTIONS.key()}=-javaagent:$$PWD/plugins/$jvmProfilerJar=$param"
}
- // The priority of the parameters defined on the page is greater than
the app conf file, property parameters etc.
- if (MapUtils.isNotEmpty(submitRequest.option)) {
- submitRequest.option.foreach(x => array +=
s"-D${x._1.trim}=${x._2.toString.trim}")
+ // app properties
+ if (MapUtils.isNotEmpty(submitRequest.properties)) {
+ submitRequest.properties.foreach(x => array += s"-D${x._1}=${x._2}")
}
-
- //-D other dynamic parameter
- if (submitRequest.properties != null &&
submitRequest.properties.nonEmpty) {
- submitRequest.properties
- .filter(_._1 != "classloader.resolve-order")
- .foreach(x => array += s"-D${x._1}=${x._2}")
- }
-
- array +=
s"-Dclassloader.resolve-order=${submitRequest.resolveOrder.getName}"
-
array.toArray
}
@@ -328,25 +315,16 @@ trait FlinkSubmitTrait extends Logger {
FlinkRunOption.mergeOptions(CliFrontendParser.getRunCommandOptions,
customCommandLineOptions)
}
- private[submit] def extractConfiguration(flinkHome: String,
- properties: JavaMap[String, String],
- extraParameter: JavaMap[String,
Any],
- resolveOrder: ResolveOrder):
Configuration = {
+ private[submit] def extractConfiguration(flinkHome: String, properties:
JavaMap[String, Any]): Configuration = {
val commandLine = {
val commandLineOptions = getCommandLineOptions(flinkHome)
//read and verify user config...
val cliArgs = {
val array = new ArrayBuffer[String]()
// The priority of the parameters defined on the page is greater than
the app conf file, property parameters etc.
- if (MapUtils.isNotEmpty(extraParameter)) {
- extraParameter.foreach(x => array +=
s"-D${x._1.trim}=${x._2.toString.trim}")
- }
- if (properties != null && properties.nonEmpty) {
- properties
- .filter(_._1 != "classloader.resolve-order")
- .foreach(x => array += s"-D${x._1}=${x._2}")
+ if (MapUtils.isNotEmpty(properties)) {
+ properties.foreach(x => array += s"-D${x._1}=${x._2.toString.trim}")
}
- array += s"-Dclassloader.resolve-order=${resolveOrder.getName}"
array.toArray
}
FlinkRunOption.parse(commandLineOptions, cliArgs, true)