This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch k8s-shutdown
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/k8s-shutdown by this push:
new 7724a758b [Improve] k8s session cluster shutdown bug fixed.
7724a758b is described below
commit 7724a758b7c114fa83013b7127ef13c4de3cd510
Author: benjobs <[email protected]>
AuthorDate: Tue Jan 9 23:31:28 2024 +0800
[Improve] k8s session cluster shutdown bug fixed.
---
.../core/service/impl/ApplicationServiceImpl.java | 12 +-----
.../impl/KubernetesNativeApplicationClient.scala | 5 ++-
.../impl/KubernetesNativeSessionClient.scala | 46 +++++++++++++---------
3 files changed, 32 insertions(+), 31 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index c58fcd5f2..e69189a14 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1147,7 +1147,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (isKubernetesApp(application)) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(),
application.getJobId());
- KubernetesDeploymentHelper.delete(application.getK8sNamespace(),
application.getJobName());
}
if (startFuture != null) {
startFuture.cancel(true);
@@ -1355,7 +1354,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
- KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
k8SFlinkTrackMonitor.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1382,7 +1380,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
if (isKubernetesApp(application)) {
- KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
k8SFlinkTrackMonitor.unWatching(trackId);
}
});
@@ -1560,13 +1557,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}
- TrackId trackId;
- if (isKubernetesApp(application)) {
- trackId = toTrackId(application);
- KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
- } else {
- trackId = null;
- }
+ TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
KubernetesSubmitParam kubernetesSubmitParam =
new KubernetesSubmitParam(
@@ -1764,7 +1755,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
- KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId());
k8SFlinkTrackMonitor.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index be57adb41..0d8f27a42 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
import com.google.common.collect.Lists
@@ -94,7 +95,9 @@ object KubernetesNativeApplicationClient extends
KubernetesNativeClientTrait {
flinkConfig.safeSet(
DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
- super.doCancel(cancelRequest, flinkConfig)
+ val resp = super.doCancel(cancelRequest, flinkConfig)
+ KubernetesDeploymentHelper.delete(cancelRequest.kubernetesNamespace,
cancelRequest.clusterId)
+ resp
}
override def doTriggerSavepoint(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 3f2e99bde..0aee25bbe 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -135,11 +135,12 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
super.doCancel(cancelRequest, flinkConfig)
}
+ @throws[Exception]
def deploy(deployReq: DeployRequest): DeployResponse = {
val deployRequest = deployReq.asInstanceOf[KubernetesDeployRequest]
logInfo(
s"""
- |--------------------------------------- kubernetes session start
---------------------------------------
+ |--------------------------------------- kubernetes session cluster
start ---------------------------------------
| userFlinkHome : ${deployRequest.flinkVersion.flinkHome}
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
@@ -196,24 +197,38 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
} catch {
case e: Exception =>
logError(s"start flink session fail in ${deployRequest.executionMode}
mode")
- e.printStackTrace()
throw e
} finally {
Utils.close(client, clusterDescriptor, kubeClient)
}
}
+ @throws[Exception]
def shutdown(deployRequest: DeployRequest): ShutDownResponse = {
val shutDownRequest = deployRequest.asInstanceOf[KubernetesDeployRequest]
+ logInfo(
+ s"""
+ |--------------------------------------- kubernetes session cluster
shutdown ---------------------------------------
+ | userFlinkHome : ${shutDownRequest.flinkVersion.version}
+ | namespace : ${shutDownRequest.kubernetesNamespace}
+ | clusterId : ${shutDownRequest.clusterId}
+ | execMode : ${shutDownRequest.executionMode.getName}
+ | flinkImage : ${shutDownRequest.flinkImage}
+ | exposedType :
${shutDownRequest.flinkRestExposedType.getName}
+ | kubeConf : ${shutDownRequest.kubeConf}
+ | serviceAccount : ${shutDownRequest.serviceAccount}
+ | properties : ${shutDownRequest.properties.mkString(" ")}
+
|-------------------------------------------------------------------------------------------
+ |""".stripMargin)
var kubeClient: FlinkKubeClient = null
try {
val flinkConfig =
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
shutDownRequest.properties.foreach(
- m =>
- m._2 match {
- case v if v != null => flinkConfig.setString(m._1, m._2.toString)
- case _ =>
- })
+ p => {
+ if (p._2 != null) {
+ flinkConfig.setString(p._1, s"${p._2}")
+ }
+ })
flinkConfig
.safeSet(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.SESSION.getName)
.safeSet(KubernetesConfigOptions.NAMESPACE,
shutDownRequest.kubernetesNamespace)
@@ -227,21 +242,14 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
KubernetesConfigOptions.KUBE_CONFIG_FILE,
getDefaultKubernetesConf(shutDownRequest.kubeConf))
kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
- val flinkKubeClient = new FlinkKubernetesClient(kubeClient)
- val k8sService = flinkKubeClient.getService(shutDownRequest.clusterId)
- if (k8sService.isPresent) {
- kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
- KubernetesDeploymentHelper.delete(
- shutDownRequest.kubernetesNamespace,
- shutDownRequest.clusterId)
- ShutDownResponse()
- } else {
- null
- }
+ kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+ KubernetesDeploymentHelper.delete(
+ shutDownRequest.kubernetesNamespace,
+ shutDownRequest.clusterId)
+ ShutDownResponse()
} catch {
case e: Exception =>
logError(s"shutdown flink session fail in
${shutDownRequest.executionMode} mode")
- e.printStackTrace()
throw e
} finally {
Utils.close(kubeClient)