This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8f51a40d1 [Improve][kubernetes] Bind Flink's Ingress resources to the 
lifecycle of Deployment (#2605)
8f51a40d1 is described below

commit 8f51a40d1eba4d02f3d2c1a02b4d84b0eff491b7
Author: monster <[email protected]>
AuthorDate: Wed Apr 12 22:20:59 2023 +0800

    [Improve][kubernetes] Bind Flink's Ingress resources to the lifecycle of 
Deployment (#2605)
    
    * [Improve][kubernetes] ingress support ownerReferences
    
    * fix
    
    * fix
---
 .../core/service/impl/ApplicationServiceImpl.java  | 53 +++++++++-------------
 .../core/task/FlinkK8sChangeEventListener.java     |  3 --
 .../client/trait/KubernetesNativeClientTrait.scala |  1 -
 .../flink/kubernetes/FlinkK8sWatchController.scala |  1 -
 .../flink/kubernetes/IngressController.scala       | 40 ++++++++++------
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala |  1 -
 6 files changed, 47 insertions(+), 52 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index a31164a87..2ea6f2d75 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -91,7 +91,6 @@ import 
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
-import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse;
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
 
 import org.apache.commons.io.FileUtils;
@@ -1098,7 +1097,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           application.getK8sNamespace(), application.getJobName());
       KubernetesDeploymentHelper.deleteTaskConfigMap(
           application.getK8sNamespace(), application.getJobName());
-      IngressController.deleteIngress(application.getJobName(), 
application.getK8sNamespace());
     }
     if (startFuture != null) {
       startFuture.cancel(true);
@@ -1334,10 +1332,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             })
         .whenComplete(
             (t, e) -> {
-              if (isKubernetesApp(application)) {
-                IngressController.deleteIngress(
-                    application.getJobName(), application.getK8sNamespace());
-              }
               cancelFutureMap.remove(application.getId());
               applicationLogService.save(applicationLog);
             });
@@ -1523,32 +1517,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     BuildResult buildResult = buildPipeline.getBuildResult();
     if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
       buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
-    } else {
-      if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
-        Utils.notNull(buildResult);
-        DockerImageBuildResponse result = 
buildResult.as(DockerImageBuildResponse.class);
-        String ingressTemplates = application.getIngressTemplate();
-        String domainName = settingService.getIngressModeDefault();
-        if (StringUtils.isNotBlank(ingressTemplates)) {
-          String ingressOutput = result.workspacePath() + "/ingress.yaml";
-          IngressController.configureIngress(ingressOutput);
-        }
-        if (StringUtils.isNotBlank(domainName)) {
-          try {
-            IngressController.configureIngress(
-                domainName, application.getClusterId(), 
application.getK8sNamespace());
-          } catch (KubernetesClientException e) {
-            log.info("Failed to create ingress, stack info:{}", 
e.getMessage());
-            applicationLog.setException(e.getMessage());
-            applicationLog.setSuccess(false);
-            applicationLogService.save(applicationLog);
-            application.setState(FlinkAppState.FAILED.getValue());
-            application.setOptionState(OptionState.NONE.getValue());
-            updateById(application);
-            return;
-          }
-        }
-      }
     }
 
     // Get the args after placeholder replacement
@@ -1644,6 +1612,27 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             })
         .whenComplete(
             (t, e) -> {
+              if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+                String domainName = settingService.getIngressModeDefault();
+                if (StringUtils.isNotBlank(domainName)) {
+                  try {
+                    IngressController.configureIngress(
+                        domainName, application.getClusterId(), 
application.getK8sNamespace());
+                  } catch (KubernetesClientException 
kubernetesClientException) {
+                    log.info(
+                        "Failed to create ingress, stack info:{}",
+                        kubernetesClientException.getMessage());
+                    applicationLog.setException(e.getMessage());
+                    applicationLog.setSuccess(false);
+                    applicationLogService.save(applicationLog);
+                    application.setState(FlinkAppState.FAILED.getValue());
+                    application.setOptionState(OptionState.NONE.getValue());
+                    updateById(application);
+                    return;
+                  }
+                }
+              }
+
               applicationLogService.save(applicationLog);
               startFutureMap.remove(application.getId());
             });
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index cedb776f4..0d7fead67 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -25,7 +25,6 @@ import org.apache.streampark.console.core.enums.OptionState;
 import org.apache.streampark.console.core.metrics.flink.CheckPoints;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.alert.AlertService;
-import org.apache.streampark.flink.kubernetes.IngressController;
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
 import 
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent;
@@ -96,7 +95,6 @@ public class FlinkK8sChangeEventListener {
         || FlinkAppState.LOST.equals(state)
         || FlinkAppState.RESTARTING.equals(state)
         || FlinkAppState.FINISHED.equals(state)) {
-      IngressController.deleteIngress(app.getClusterId(), 
app.getK8sNamespace());
       executor.execute(() -> alertService.alert(app, state));
     }
   }
@@ -163,7 +161,6 @@ public class FlinkK8sChangeEventListener {
     long duration = jobStatus.duration();
 
     if (FlinkJobState.isEndState(state)) {
-      IngressController.deleteIngress(app.getJobName(), app.getK8sNamespace());
       if (endTime < startTime) {
         endTime = System.currentTimeMillis();
       }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 145bbf4f6..e40600472 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -78,7 +78,6 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
   override def doCancel(cancelRequest: CancelRequest, flinkConfig: 
Configuration): CancelResponse = {
     executeClientAction(cancelRequest, flinkConfig, (jobId, clusterClient) => {
       val actionResult = super.cancelJob(cancelRequest, jobId, clusterClient)
-      IngressController.deleteIngress(cancelRequest.clusterId, 
cancelRequest.kubernetesNamespace)
       CancelResponse(actionResult)
     })
   }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index dc209d241..b053e3170 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -71,7 +71,6 @@ class FlinkK8sWatchController extends Logger with 
AutoCloseable {
       canceling.invalidate(trackId)
       jobStatuses.invalidate(trackId)
       flinkMetrics.invalidate(ClusterKey.of(trackId))
-      IngressController.deleteIngress(trackId.clusterId, trackId.namespace)
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
index 3e35b9240..ed2200ba3 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
@@ -21,19 +21,16 @@ import java.io.File
 import java.io.IOException
 import java.nio.file.Files
 import java.nio.file.Paths
-
 import scala.collection.JavaConverters._
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
-
-import io.fabric8.kubernetes.api.model.IntOrString
+import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
 import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.apache.commons.io.FileUtils
 import org.apache.flink.client.program.ClusterClient
 import org.json4s.{DefaultFormats, JArray}
 import org.json4s.jackson.JsonMethods.parse
-
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.common.util.Utils._
 
@@ -50,11 +47,36 @@ object IngressController extends Logger {
           "app" -> clusterId,
           "type" -> "flink-native-kubernetes",
           "component" -> "ingress")
+
+        val deployment = client
+          .apps()
+          .deployments()
+          .inNamespace(nameSpace)
+          .withName(clusterId)
+          .get()
+
+        val deploymentUid = if (deployment != null) {
+          deployment.getMetadata.getUid
+        } else {
+          throw new RuntimeException(s"Deployment with name $clusterId not 
found in namespace $nameSpace")
+        }
+
+        // Create OwnerReference object
+        val ownerReference = new OwnerReferenceBuilder()
+          .withApiVersion("apps/v1")
+          .withKind("Deployment")
+          .withName(clusterId)
+          .withUid(deploymentUid)
+          .withController(true)
+          .withBlockOwnerDeletion(true)
+          .build()
+
         val ingress = new IngressBuilder()
           .withNewMetadata()
           .withName(clusterId)
           .addToAnnotations(annotMap.asJava)
           .addToLabels(labelsMap.asJava)
+          .addToOwnerReferences(ownerReference) // Add OwnerReference
           .endMetadata()
           .withNewSpec()
           .addNewRule()
@@ -93,16 +115,6 @@ object IngressController extends Logger {
     }
   }
 
-  def deleteIngress(ingressName: String, nameSpace: String): Unit = {
-    if (determineThePodSurvivalStatus(ingressName, nameSpace)) {
-      close {
-        val client = new DefaultKubernetesClient
-        
client.network.ingress.inNamespace(nameSpace).withName(ingressName).delete
-        client
-      }
-    }
-  }
-
   private[this] def determineThePodSurvivalStatus(name: String, nameSpace: 
String): Boolean = {
     tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
       Try {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 2b80c8ad7..b6e62caf6 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -280,7 +280,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
             logger.info("Enter the task failure deletion process.")
             
KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, 
trackId.clusterId, trackId.jobId)
             KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, 
trackId.clusterId)
-            IngressController.deleteIngress(trackId.clusterId, 
trackId.namespace)
             FlinkJobState.FAILED
           } else {
             inferSilentOrLostFromPreCache(latest)

Reply via email to