This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 8f51a40d1 [Improve][kubernetes] Bind Flink's Ingress resources to the
lifecycle of Deployment (#2605)
8f51a40d1 is described below
commit 8f51a40d1eba4d02f3d2c1a02b4d84b0eff491b7
Author: monster <[email protected]>
AuthorDate: Wed Apr 12 22:20:59 2023 +0800
[Improve][kubernetes] Bind Flink's Ingress resources to the lifecycle of
Deployment (#2605)
* [Improve][kubernetes] ingress support ownerReferences
* fix
* fix
---
.../core/service/impl/ApplicationServiceImpl.java | 53 +++++++++-------------
.../core/task/FlinkK8sChangeEventListener.java | 3 --
.../client/trait/KubernetesNativeClientTrait.scala | 1 -
.../flink/kubernetes/FlinkK8sWatchController.scala | 1 -
.../flink/kubernetes/IngressController.scala | 40 ++++++++++------
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 1 -
6 files changed, 47 insertions(+), 52 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index a31164a87..2ea6f2d75 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -91,7 +91,6 @@ import
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
-import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
import org.apache.commons.io.FileUtils;
@@ -1098,7 +1097,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.getK8sNamespace(), application.getJobName());
KubernetesDeploymentHelper.deleteTaskConfigMap(
application.getK8sNamespace(), application.getJobName());
- IngressController.deleteIngress(application.getJobName(),
application.getK8sNamespace());
}
if (startFuture != null) {
startFuture.cancel(true);
@@ -1334,10 +1332,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
})
.whenComplete(
(t, e) -> {
- if (isKubernetesApp(application)) {
- IngressController.deleteIngress(
- application.getJobName(), application.getK8sNamespace());
- }
cancelFutureMap.remove(application.getId());
applicationLogService.save(applicationLog);
});
@@ -1523,32 +1517,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
BuildResult buildResult = buildPipeline.getBuildResult();
if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
- } else {
- if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
- Utils.notNull(buildResult);
- DockerImageBuildResponse result =
buildResult.as(DockerImageBuildResponse.class);
- String ingressTemplates = application.getIngressTemplate();
- String domainName = settingService.getIngressModeDefault();
- if (StringUtils.isNotBlank(ingressTemplates)) {
- String ingressOutput = result.workspacePath() + "/ingress.yaml";
- IngressController.configureIngress(ingressOutput);
- }
- if (StringUtils.isNotBlank(domainName)) {
- try {
- IngressController.configureIngress(
- domainName, application.getClusterId(),
application.getK8sNamespace());
- } catch (KubernetesClientException e) {
- log.info("Failed to create ingress, stack info:{}",
e.getMessage());
- applicationLog.setException(e.getMessage());
- applicationLog.setSuccess(false);
- applicationLogService.save(applicationLog);
- application.setState(FlinkAppState.FAILED.getValue());
- application.setOptionState(OptionState.NONE.getValue());
- updateById(application);
- return;
- }
- }
- }
}
// Get the args after placeholder replacement
@@ -1644,6 +1612,27 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
})
.whenComplete(
(t, e) -> {
+ if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+ String domainName = settingService.getIngressModeDefault();
+ if (StringUtils.isNotBlank(domainName)) {
+ try {
+ IngressController.configureIngress(
+ domainName, application.getClusterId(),
application.getK8sNamespace());
+ } catch (KubernetesClientException
kubernetesClientException) {
+ log.info(
+ "Failed to create ingress, stack info:{}",
+ kubernetesClientException.getMessage());
+ applicationLog.setException(e.getMessage());
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+ application.setState(FlinkAppState.FAILED.getValue());
+ application.setOptionState(OptionState.NONE.getValue());
+ updateById(application);
+ return;
+ }
+ }
+ }
+
applicationLogService.save(applicationLog);
startFutureMap.remove(application.getId());
});
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index cedb776f4..0d7fead67 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -25,7 +25,6 @@ import org.apache.streampark.console.core.enums.OptionState;
import org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.alert.AlertService;
-import org.apache.streampark.flink.kubernetes.IngressController;
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
import
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent;
@@ -96,7 +95,6 @@ public class FlinkK8sChangeEventListener {
|| FlinkAppState.LOST.equals(state)
|| FlinkAppState.RESTARTING.equals(state)
|| FlinkAppState.FINISHED.equals(state)) {
- IngressController.deleteIngress(app.getClusterId(),
app.getK8sNamespace());
executor.execute(() -> alertService.alert(app, state));
}
}
@@ -163,7 +161,6 @@ public class FlinkK8sChangeEventListener {
long duration = jobStatus.duration();
if (FlinkJobState.isEndState(state)) {
- IngressController.deleteIngress(app.getJobName(), app.getK8sNamespace());
if (endTime < startTime) {
endTime = System.currentTimeMillis();
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 145bbf4f6..e40600472 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -78,7 +78,6 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
override def doCancel(cancelRequest: CancelRequest, flinkConfig:
Configuration): CancelResponse = {
executeClientAction(cancelRequest, flinkConfig, (jobId, clusterClient) => {
val actionResult = super.cancelJob(cancelRequest, jobId, clusterClient)
- IngressController.deleteIngress(cancelRequest.clusterId,
cancelRequest.kubernetesNamespace)
CancelResponse(actionResult)
})
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index dc209d241..b053e3170 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -71,7 +71,6 @@ class FlinkK8sWatchController extends Logger with
AutoCloseable {
canceling.invalidate(trackId)
jobStatuses.invalidate(trackId)
flinkMetrics.invalidate(ClusterKey.of(trackId))
- IngressController.deleteIngress(trackId.clusterId, trackId.namespace)
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
index 3e35b9240..ed2200ba3 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/IngressController.scala
@@ -21,19 +21,16 @@ import java.io.File
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Paths
-
import scala.collection.JavaConverters._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
-
-import io.fabric8.kubernetes.api.model.IntOrString
+import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.commons.io.FileUtils
import org.apache.flink.client.program.ClusterClient
import org.json4s.{DefaultFormats, JArray}
import org.json4s.jackson.JsonMethods.parse
-
import org.apache.streampark.common.util.Logger
import org.apache.streampark.common.util.Utils._
@@ -50,11 +47,36 @@ object IngressController extends Logger {
"app" -> clusterId,
"type" -> "flink-native-kubernetes",
"component" -> "ingress")
+
+ val deployment = client
+ .apps()
+ .deployments()
+ .inNamespace(nameSpace)
+ .withName(clusterId)
+ .get()
+
+ val deploymentUid = if (deployment != null) {
+ deployment.getMetadata.getUid
+ } else {
+ throw new RuntimeException(s"Deployment with name $clusterId not
found in namespace $nameSpace")
+ }
+
+ // Create OwnerReference object
+ val ownerReference = new OwnerReferenceBuilder()
+ .withApiVersion("apps/v1")
+ .withKind("Deployment")
+ .withName(clusterId)
+ .withUid(deploymentUid)
+ .withController(true)
+ .withBlockOwnerDeletion(true)
+ .build()
+
val ingress = new IngressBuilder()
.withNewMetadata()
.withName(clusterId)
.addToAnnotations(annotMap.asJava)
.addToLabels(labelsMap.asJava)
+ .addToOwnerReferences(ownerReference) // Add OwnerReference
.endMetadata()
.withNewSpec()
.addNewRule()
@@ -93,16 +115,6 @@ object IngressController extends Logger {
}
}
- def deleteIngress(ingressName: String, nameSpace: String): Unit = {
- if (determineThePodSurvivalStatus(ingressName, nameSpace)) {
- close {
- val client = new DefaultKubernetesClient
-
client.network.ingress.inNamespace(nameSpace).withName(ingressName).delete
- client
- }
- }
- }
-
private[this] def determineThePodSurvivalStatus(name: String, nameSpace:
String): Boolean = {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
Try {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 2b80c8ad7..b6e62caf6 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -280,7 +280,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
logger.info("Enter the task failure deletion process.")
KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace,
trackId.clusterId, trackId.jobId)
KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace,
trackId.clusterId)
- IngressController.deleteIngress(trackId.clusterId,
trackId.namespace)
FlinkJobState.FAILED
} else {
inferSilentOrLostFromPreCache(latest)