This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 2006d9e6b [Improve] flink k8s session cluster shutdown bug fixed
(#3476)
2006d9e6b is described below
commit 2006d9e6b98a1b14367af60999a5efe01e11956a
Author: benjobs <[email protected]>
AuthorDate: Tue Jan 9 23:37:45 2024 +0800
[Improve] flink k8s session cluster shutdown bug fixed (#3476)
* [Imprpve] k8s cluster shutdown bug fixed.
* [hotfix][ci] Exclude twitter website from dead link check
---------
Co-authored-by: benjobs <[email protected]>
---
.dlc.json | 3 +
.../core/controller/FlinkClusterController.java | 12 +--
.../console/core/service/FlinkClusterService.java | 6 +-
.../core/service/impl/ApplicationServiceImpl.java | 12 +--
.../core/service/impl/FlinkClusterServiceImpl.java | 100 ++++++++-------------
.../streampark/flink/client/FlinkClient.scala | 6 +-
.../flink/client/bean/DeployRequest.scala | 47 +++++++---
.../flink/client/bean/ShutDownRequest.scala | 32 -------
.../flink/client/FlinkClientHandler.scala | 2 +-
.../impl/KubernetesNativeApplicationClient.scala | 5 +-
.../impl/KubernetesNativeSessionClient.scala | 93 ++++++++++---------
.../flink/client/impl/YarnSessionClient.scala | 2 +-
12 files changed, 141 insertions(+), 179 deletions(-)
diff --git a/.dlc.json b/.dlc.json
index e80f2c3f0..99c96f25f 100644
--- a/.dlc.json
+++ b/.dlc.json
@@ -17,6 +17,9 @@
},
{
"pattern": "^https://opencollective.com"
+ },
+ {
+ "pattern": "^https://twitter.com*"
}
],
"timeout": "30s",
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 148cd554b..2b01e2a54 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -91,22 +91,22 @@ public class FlinkClusterController {
@Operation(summary = "Start flink cluster")
@PostMapping("start")
- public RestResponse start(FlinkCluster cluster) {
- flinkClusterService.start(cluster);
+ public RestResponse start(Long id) {
+ flinkClusterService.start(id);
return RestResponse.success();
}
@Operation(summary = "Shutdown flink cluster")
@PostMapping("shutdown")
- public RestResponse shutdown(FlinkCluster cluster) {
- flinkClusterService.shutdown(cluster);
+ public RestResponse shutdown(Long id) {
+ flinkClusterService.shutdown(id);
return RestResponse.success();
}
@Operation(summary = "Delete flink cluster")
@PostMapping("delete")
- public RestResponse delete(FlinkCluster cluster) {
- flinkClusterService.delete(cluster);
+ public RestResponse delete(Long id) {
+ flinkClusterService.delete(id);
return RestResponse.success();
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d149152af..e9d0397ef 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -32,13 +32,13 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
Boolean create(FlinkCluster flinkCluster);
- void delete(FlinkCluster flinkCluster);
+ void delete(Long id);
void update(FlinkCluster flinkCluster);
- void start(FlinkCluster flinkCluster);
+ void start(Long id);
- void shutdown(FlinkCluster flinkCluster);
+ void shutdown(Long id);
Boolean existsByClusterId(String clusterId, Long id);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 605c7fe0e..ed55c27cc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1147,7 +1147,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (isKubernetesApp(application)) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(),
application.getJobId());
- KubernetesDeploymentHelper.delete(application.getK8sNamespace(),
application.getJobName());
}
if (startFuture != null) {
startFuture.cancel(true);
@@ -1355,7 +1354,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
- KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
k8SFlinkTrackMonitor.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1382,7 +1380,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
if (isKubernetesApp(application)) {
- KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
k8SFlinkTrackMonitor.unWatching(trackId);
}
});
@@ -1560,13 +1557,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}
- TrackId trackId;
- if (isKubernetesApp(application)) {
- trackId = toTrackId(application);
- KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
- } else {
- trackId = null;
- }
+ TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
KubernetesSubmitParam kubernetesSubmitParam =
new KubernetesSubmitParam(
@@ -1764,7 +1755,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
- KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId());
k8SFlinkTrackMonitor.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index fde4c00e6..ef1f4a283 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -35,8 +35,7 @@ import
org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.DeployRequest;
import org.apache.streampark.flink.client.bean.DeployResponse;
-import org.apache.streampark.flink.client.bean.KubernetesDeployParam;
-import org.apache.streampark.flink.client.bean.ShutDownRequest;
+import org.apache.streampark.flink.client.bean.KubernetesDeployRequest;
import org.apache.streampark.flink.client.bean.ShutDownResponse;
import org.apache.commons.lang3.StringUtils;
@@ -149,37 +148,12 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Override
@Transactional(rollbackFor = {Exception.class})
- public void start(FlinkCluster cluster) {
- FlinkCluster flinkCluster = getById(cluster.getId());
+ public void start(Long id) {
+ FlinkCluster flinkCluster = getById(id);
try {
ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
- KubernetesDeployParam kubernetesDeployParam = null;
- switch (executionModeEnum) {
- case YARN_SESSION:
- break;
- case KUBERNETES_NATIVE_SESSION:
- kubernetesDeployParam =
- new KubernetesDeployParam(
- flinkCluster.getClusterId(),
- flinkCluster.getK8sNamespace(),
- flinkCluster.getK8sConf(),
- flinkCluster.getServiceAccount(),
- flinkCluster.getFlinkImage(),
- flinkCluster.getK8sRestExposedTypeEnum());
- break;
- default:
- throw new ApiAlertException(
- "the ExecutionModeEnum " + executionModeEnum.getName() + "can't
start!");
- }
- FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
- DeployRequest deployRequest =
- new DeployRequest(
- flinkEnv.getFlinkVersion(),
- executionModeEnum,
- flinkCluster.getProperties(),
- flinkCluster.getClusterId(),
- kubernetesDeployParam);
- log.info("deploy cluster request " + deployRequest);
+ DeployRequest deployRequest = getDeployRequest(flinkCluster);
+ log.info("deploy cluster request: " + deployRequest);
Future<DeployResponse> future =
executorService.submit(() -> FlinkClient.deploy(deployRequest));
DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
@@ -208,6 +182,33 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
}
+ private DeployRequest getDeployRequest(FlinkCluster flinkCluster) {
+ ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+ FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
+ switch (executionModeEnum) {
+ case YARN_SESSION:
+ return DeployRequest.apply(
+ flinkEnv.getFlinkVersion(),
+ executionModeEnum,
+ flinkCluster.getProperties(),
+ flinkCluster.getClusterId());
+ case KUBERNETES_NATIVE_SESSION:
+ return KubernetesDeployRequest.apply(
+ flinkEnv.getFlinkVersion(),
+ executionModeEnum,
+ flinkCluster.getProperties(),
+ flinkCluster.getClusterId(),
+ flinkCluster.getK8sNamespace(),
+ flinkCluster.getK8sConf(),
+ flinkCluster.getServiceAccount(),
+ flinkCluster.getFlinkImage(),
+ flinkCluster.getK8sRestExposedTypeEnum());
+ default:
+ throw new ApiAlertException(
+ "the ExecutionModeEnum " + executionModeEnum.getName() + "can't
start!");
+ }
+ }
+
@Override
public void update(FlinkCluster cluster) {
FlinkCluster flinkCluster = getById(cluster.getId());
@@ -242,29 +243,11 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public void shutdown(FlinkCluster cluster) {
- FlinkCluster flinkCluster = this.getById(cluster.getId());
+ public void shutdown(Long id) {
+ FlinkCluster flinkCluster = this.getById(id);
// 1) check mode
ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
String clusterId = flinkCluster.getClusterId();
- KubernetesDeployParam kubernetesDeployParam = null;
- switch (executionModeEnum) {
- case YARN_SESSION:
- break;
- case KUBERNETES_NATIVE_SESSION:
- kubernetesDeployParam =
- new KubernetesDeployParam(
- flinkCluster.getClusterId(),
- flinkCluster.getK8sNamespace(),
- flinkCluster.getK8sConf(),
- flinkCluster.getServiceAccount(),
- flinkCluster.getFlinkImage(),
- flinkCluster.getK8sRestExposedTypeEnum());
- break;
- default:
- throw new ApiAlertException(
- "the ExecutionModeEnum " + executionModeEnum.getName() + "can't
shutdown!");
- }
if (StringUtils.isBlank(clusterId)) {
throw new ApiAlertException("the clusterId can not be empty!");
}
@@ -291,18 +274,10 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
// 4) shutdown
- FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
- ShutDownRequest stopRequest =
- new ShutDownRequest(
- flinkEnv.getFlinkVersion(),
- executionModeEnum,
- flinkCluster.getProperties(),
- clusterId,
- kubernetesDeployParam);
-
+ DeployRequest deployRequest = getDeployRequest(flinkCluster);
try {
Future<ShutDownResponse> future =
- executorService.submit(() -> FlinkClient.shutdown(stopRequest));
+ executorService.submit(() -> FlinkClient.shutdown(deployRequest));
ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
if (shutDownResponse != null) {
flinkCluster.setAddress(null);
@@ -350,8 +325,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public void delete(FlinkCluster cluster) {
- Long id = cluster.getId();
+ public void delete(Long id) {
FlinkCluster flinkCluster = getById(id);
if (flinkCluster == null) {
throw new ApiAlertException("flink cluster not exist, please check.");
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index c2ea9ba48..3c7c65dcb 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -40,7 +40,7 @@ object FlinkClient extends Logger {
"org.apache.streampark.flink.client.bean.CancelRequest" -> "cancel"
private[this] val SHUTDOWN_REQUEST =
- "org.apache.streampark.flink.client.bean.ShutDownRequest" -> "shutdown"
+ "org.apache.streampark.flink.client.bean.DeployRequest" -> "shutdown"
private[this] val SAVEPOINT_REQUEST =
"org.apache.streampark.flink.client.bean.TriggerSavepointRequest" ->
"triggerSavepoint"
@@ -57,8 +57,8 @@ object FlinkClient extends Logger {
proxy[DeployResponse](deployRequest, deployRequest.flinkVersion,
DEPLOY_REQUEST)
}
- def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
- proxy[ShutDownResponse](shutDownRequest, shutDownRequest.flinkVersion,
SHUTDOWN_REQUEST)
+ def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
+ proxy[ShutDownResponse](deployRequest, deployRequest.flinkVersion,
SHUTDOWN_REQUEST)
}
def triggerSavepoint(savepointRequest: TriggerSavepointRequest):
SavepointResponse = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index db7bbcd3d..81b36a17b 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -24,8 +24,6 @@ import org.apache.streampark.flink.util.FlinkUtils
import org.apache.commons.io.FileUtils
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
-import javax.annotation.Nullable
-
import java.io.File
import java.util.{Map => JavaMap}
@@ -33,8 +31,7 @@ case class DeployRequest(
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
properties: JavaMap[String, Any],
- clusterId: String,
- @Nullable k8sDeployParam: KubernetesDeployParam) {
+ clusterId: String) {
private[client] lazy val hdfsWorkspace = {
@@ -63,10 +60,38 @@ case class DeployRequest(
}
}
-case class KubernetesDeployParam(
- clusterId: String,
- kubernetesNamespace: String =
KubernetesConfigOptions.NAMESPACE.defaultValue(),
- kubeConf: String = "~/.kube/config",
- serviceAccount: String =
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
- flinkImage: String =
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
- @Nullable flinkRestExposedType: FlinkK8sRestExposedType =
FlinkK8sRestExposedType.CLUSTER_IP)
+class KubernetesDeployRequest(
+ override val flinkVersion: FlinkVersion,
+ override val executionMode: ExecutionMode,
+ override val properties: JavaMap[String, Any],
+ override val clusterId: String,
+ val kubernetesNamespace: String =
KubernetesConfigOptions.NAMESPACE.defaultValue(),
+ val kubeConf: String = "~/.kube/config",
+ val serviceAccount: String =
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
+ val flinkImage: String =
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
+ val flinkRestExposedType: FlinkK8sRestExposedType =
FlinkK8sRestExposedType.CLUSTER_IP)
+ extends DeployRequest(flinkVersion, executionMode, properties, clusterId)
+
+object KubernetesDeployRequest {
+ def apply(
+ flinkVersion: FlinkVersion,
+ executionMode: ExecutionMode,
+ properties: JavaMap[String, Any],
+ clusterId: String,
+ kubernetesNamespace: String,
+ kubeConf: String,
+ serviceAccount: String,
+ flinkImage: String,
+ flinkRestExposedType: FlinkK8sRestExposedType): KubernetesDeployRequest
= {
+ new KubernetesDeployRequest(
+ flinkVersion,
+ executionMode,
+ properties,
+ clusterId,
+ kubernetesNamespace,
+ kubeConf,
+ serviceAccount,
+ flinkImage,
+ flinkRestExposedType)
+ }
+}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
deleted file mode 100644
index a6b75bbdf..000000000
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownRequest.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.client.bean
-
-import org.apache.streampark.common.conf.FlinkVersion
-import org.apache.streampark.common.enums.ExecutionMode
-
-import javax.annotation.Nullable
-
-import java.util.{Map => JavaMap}
-
-case class ShutDownRequest(
- flinkVersion: FlinkVersion,
- executionMode: ExecutionMode,
- @Nullable properties: JavaMap[String, Any],
- clusterId: String,
- @Nullable kubernetesDeployParam: KubernetesDeployParam)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index e85ceb6ee..c6fa402e9 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -70,7 +70,7 @@ object FlinkClientHandler {
}
}
- def shutdown(request: ShutDownRequest): ShutDownResponse = {
+ def shutdown(request: DeployRequest): ShutDownResponse = {
request.executionMode match {
case YARN_SESSION => YarnSessionClient.shutdown(request)
case KUBERNETES_NATIVE_SESSION =>
KubernetesNativeSessionClient.shutdown(request)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index be57adb41..0d8f27a42 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
import com.google.common.collect.Lists
@@ -94,7 +95,9 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConfig.safeSet(
DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
- super.doCancel(cancelRequest, flinkConfig)
+ val resp = super.doCancel(cancelRequest, flinkConfig)
+ KubernetesDeploymentHelper.delete(cancelRequest.kubernetesNamespace,
cancelRequest.clusterId)
+ resp
}
override def doTriggerSavepoint(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 621b7ac8a..0aee25bbe 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -25,6 +25,7 @@ import
org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
import org.apache.streampark.flink.core.FlinkKubernetesClient
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.kubernetes.model.ClusterKey
import io.fabric8.kubernetes.api.model.{Config => _}
@@ -134,18 +135,20 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
super.doCancel(cancelRequest, flinkConfig)
}
- def deploy(deployRequest: DeployRequest): DeployResponse = {
+ @throws[Exception]
+ def deploy(deployReq: DeployRequest): DeployResponse = {
+ val deployRequest = deployReq.asInstanceOf[KubernetesDeployRequest]
logInfo(
s"""
- |--------------------------------------- kubernetes session start
---------------------------------------
+ |--------------------------------------- kubernetes session cluster
start ---------------------------------------
| userFlinkHome : ${deployRequest.flinkVersion.flinkHome}
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
- | namespace :
${deployRequest.k8sDeployParam.kubernetesNamespace}
- | exposedType :
${deployRequest.k8sDeployParam.flinkRestExposedType}
- | serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
- | flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
+ | namespace : ${deployRequest.kubernetesNamespace}
+ | exposedType : ${deployRequest.flinkRestExposedType}
+ | serviceAccount : ${deployRequest.serviceAccount}
+ | flinkImage : ${deployRequest.flinkImage}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
@@ -157,20 +160,16 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
flinkConfig
.safeSet(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.SESSION.getName)
- .safeSet(
- KubernetesConfigOptions.NAMESPACE,
- deployRequest.k8sDeployParam.kubernetesNamespace)
- .safeSet(
- KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
- deployRequest.k8sDeployParam.serviceAccount)
+ .safeSet(KubernetesConfigOptions.NAMESPACE,
deployRequest.kubernetesNamespace)
+ .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
deployRequest.serviceAccount)
.safeSet(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-
ServiceExposedType.valueOf(deployRequest.k8sDeployParam.flinkRestExposedType.getName))
+
ServiceExposedType.valueOf(deployRequest.flinkRestExposedType.getName))
.safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
- .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
deployRequest.k8sDeployParam.flinkImage)
+ .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
deployRequest.flinkImage)
.safeSet(
KubernetesConfigOptions.KUBE_CONFIG_FILE,
- getDefaultKubernetesConf(deployRequest.k8sDeployParam.kubeConf))
+ getDefaultKubernetesConf(deployRequest.kubeConf))
.safeSet(
DeploymentOptionsInternal.CONF_DIR,
s"${deployRequest.flinkVersion.flinkHome}/conf")
@@ -198,59 +197,59 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
} catch {
case e: Exception =>
logError(s"start flink session fail in ${deployRequest.executionMode}
mode")
- e.printStackTrace()
throw e
} finally {
Utils.close(client, clusterDescriptor, kubeClient)
}
}
- def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+ @throws[Exception]
+ def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
+ val shutDownRequest = deployRequest.asInstanceOf[KubernetesDeployRequest]
+ logInfo(
+ s"""
+ |--------------------------------------- kubernetes session cluster
shutdown ---------------------------------------
+ | userFlinkHome : ${shutDownRequest.flinkVersion.version}
+ | namespace : ${shutDownRequest.kubernetesNamespace}
+ | clusterId : ${shutDownRequest.clusterId}
+ | execMode : ${shutDownRequest.executionMode.getName}
+ | flinkImage : ${shutDownRequest.flinkImage}
+ | exposedType :
${shutDownRequest.flinkRestExposedType.getName}
+ | kubeConf : ${shutDownRequest.kubeConf}
+ | serviceAccount : ${shutDownRequest.serviceAccount}
+ | properties : ${shutDownRequest.properties.mkString(" ")}
+
|-------------------------------------------------------------------------------------------
+ |""".stripMargin)
var kubeClient: FlinkKubeClient = null
try {
val flinkConfig =
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
shutDownRequest.properties.foreach(
- m =>
- m._2 match {
- case v if v != null => flinkConfig.setString(m._1, m._2.toString)
- case _ =>
- })
+ p => {
+ if (p._2 != null) {
+ flinkConfig.setString(p._1, s"${p._2}")
+ }
+ })
flinkConfig
.safeSet(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.SESSION.getName)
- .safeSet(
- KubernetesConfigOptions.NAMESPACE,
- shutDownRequest.kubernetesDeployParam.kubernetesNamespace)
- .safeSet(
- KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
- shutDownRequest.kubernetesDeployParam.serviceAccount)
+ .safeSet(KubernetesConfigOptions.NAMESPACE,
shutDownRequest.kubernetesNamespace)
+ .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
shutDownRequest.serviceAccount)
.safeSet(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
- ServiceExposedType.valueOf(
-
shutDownRequest.kubernetesDeployParam.flinkRestExposedType.getName))
+
ServiceExposedType.valueOf(shutDownRequest.flinkRestExposedType.getName))
.safeSet(KubernetesConfigOptions.CLUSTER_ID, shutDownRequest.clusterId)
- .safeSet(
- KubernetesConfigOptions.CONTAINER_IMAGE,
- shutDownRequest.kubernetesDeployParam.flinkImage)
+ .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
shutDownRequest.flinkImage)
.safeSet(
KubernetesConfigOptions.KUBE_CONFIG_FILE,
-
getDefaultKubernetesConf(shutDownRequest.kubernetesDeployParam.kubeConf))
+ getDefaultKubernetesConf(shutDownRequest.kubeConf))
kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
- val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
-
- if (
- shutDownRequest.clusterId != null && kubeClientWrapper
- .getService(shutDownRequest.clusterId)
- .isPresent
- ) {
- kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
- ShutDownResponse()
- } else {
- null
- }
+ kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+ KubernetesDeploymentHelper.delete(
+ shutDownRequest.kubernetesNamespace,
+ shutDownRequest.clusterId)
+ ShutDownResponse()
} catch {
case e: Exception =>
logError(s"shutdown flink session fail in
${shutDownRequest.executionMode} mode")
- e.printStackTrace()
throw e
} finally {
Utils.close(kubeClient)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index d2a98b740..91eb1d92d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -245,7 +245,7 @@ object YarnSessionClient extends YarnClientTrait {
}
}
- def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+ def shutdown(shutDownRequest: DeployRequest): ShutDownResponse = {
var clusterDescriptor: YarnClusterDescriptor = null
var client: ClusterClient[ApplicationId] = null
try {