This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 31ab87e63 [Improve] k8s session cluster deploy and shutdown improvement
31ab87e63 is described below
commit 31ab87e6334eb7b8f2c77d37618814c22a30a496
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 11 08:46:33 2024 +0800
[Improve] k8s session cluster deploy and shutdown improvement
---
.../impl/KubernetesNativeSessionClient.scala | 89 ++++++++++------------
1 file changed, 41 insertions(+), 48 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 0aee25bbe..0cffc30ba 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -35,7 +35,7 @@ import org.apache.flink.configuration._
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.{KubernetesConfigOptions,
KubernetesDeploymentTarget}
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.ServiceExposedType
-import org.apache.flink.kubernetes.kubeclient.{FlinkKubeClient,
FlinkKubeClientFactory}
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory
import java.io.File
@@ -152,43 +152,27 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
+
+ val flinkConfig = getFlinkK8sConfig(deployRequest)
+ val kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
+
var clusterDescriptor: KubernetesClusterDescriptor = null
var client: ClusterClient[String] = null
- var kubeClient: FlinkKubeClient = null
- try {
- val flinkConfig =
- extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
- flinkConfig
- .safeSet(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.SESSION.getName)
- .safeSet(KubernetesConfigOptions.NAMESPACE,
deployRequest.kubernetesNamespace)
- .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
deployRequest.serviceAccount)
- .safeSet(
- KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-
ServiceExposedType.valueOf(deployRequest.flinkRestExposedType.getName))
- .safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
- .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
deployRequest.flinkImage)
- .safeSet(
- KubernetesConfigOptions.KUBE_CONFIG_FILE,
- getDefaultKubernetesConf(deployRequest.kubeConf))
- .safeSet(
- DeploymentOptionsInternal.CONF_DIR,
- s"${deployRequest.flinkVersion.flinkHome}/conf")
+ try {
val kubernetesClusterDescriptor =
getK8sClusterDescriptorAndSpecification(flinkConfig)
clusterDescriptor = kubernetesClusterDescriptor._1
- kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
+
val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
+ val kubeService = kubeClientWrapper.getService(deployRequest.clusterId)
- if (
- deployRequest.clusterId != null && kubeClientWrapper
- .getService(deployRequest.clusterId)
- .isPresent
- ) {
+ if (kubeService.isPresent) {
client =
clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
} else {
client =
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
}
+
if (client.getWebInterfaceURL != null) {
DeployResponse(client.getWebInterfaceURL, client.getClusterId)
} else {
@@ -220,29 +204,22 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
| properties : ${shutDownRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
- var kubeClient: FlinkKubeClient = null
+
+ val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
+ val kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
try {
- val flinkConfig =
getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
- shutDownRequest.properties.foreach(
- p => {
- if (p._2 != null) {
- flinkConfig.setString(p._1, s"${p._2}")
- }
- })
- flinkConfig
- .safeSet(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.SESSION.getName)
- .safeSet(KubernetesConfigOptions.NAMESPACE,
shutDownRequest.kubernetesNamespace)
- .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
shutDownRequest.serviceAccount)
- .safeSet(
- KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-
ServiceExposedType.valueOf(shutDownRequest.flinkRestExposedType.getName))
- .safeSet(KubernetesConfigOptions.CLUSTER_ID, shutDownRequest.clusterId)
- .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
shutDownRequest.flinkImage)
- .safeSet(
- KubernetesConfigOptions.KUBE_CONFIG_FILE,
- getDefaultKubernetesConf(shutDownRequest.kubeConf))
- kubeClient =
FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
- kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+ val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
+ val kubeService = kubeClientWrapper.getService(deployRequest.clusterId)
+ if (kubeService.isPresent) {
+ kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
+ } else {
+ val kubernetesClusterDescriptor =
getK8sClusterDescriptorAndSpecification(flinkConfig)
+ val clusterDescriptor = kubernetesClusterDescriptor._1
+ val client =
clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
+ if (client != null) {
+ client.shutDownCluster()
+ }
+ }
KubernetesDeploymentHelper.delete(
shutDownRequest.kubernetesNamespace,
shutDownRequest.clusterId)
@@ -256,6 +233,22 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
}
}
+ private[this] def getFlinkK8sConfig(deployRequest: KubernetesDeployRequest):
Configuration = {
+ extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
+ .safeSet(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.SESSION.getName)
+ .safeSet(KubernetesConfigOptions.NAMESPACE,
deployRequest.kubernetesNamespace)
+ .safeSet(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
deployRequest.serviceAccount)
+ .safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
+ .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE,
deployRequest.flinkImage)
+ .safeSet(
+ KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+ ServiceExposedType.valueOf(deployRequest.flinkRestExposedType.getName))
+ .safeSet(
+ KubernetesConfigOptions.KUBE_CONFIG_FILE,
+ getDefaultKubernetesConf(deployRequest.kubeConf))
+ .safeSet(DeploymentOptionsInternal.CONF_DIR,
s"${deployRequest.flinkVersion.flinkHome}/conf")
+ }
+
override def doTriggerSavepoint(
request: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {