This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new ca6973ce2 fix: k8s mode flink cluster can't be stopped (#3965)
ca6973ce2 is described below
commit ca6973ce2e0870b633d734123727916bf640884c
Author: Darcy <[email protected]>
AuthorDate: Fri Aug 16 23:38:09 2024 +0800
fix: k8s mode flink cluster can't be stopped (#3965)
---
.../streampark/console/core/enums/FlinkAppState.java | 6 +-----
.../console/core/service/impl/ApplicationServiceImpl.java | 11 ++++++-----
.../streampark/flink/kubernetes/enums/FlinkJobState.scala | 2 --
.../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala | 15 ++++-----------
4 files changed, 11 insertions(+), 23 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 6e558a65a..da69fe14d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -141,11 +141,7 @@ public enum FlinkAppState implements Serializable {
public static class Bridge {
/** covert from org.apache.streampark.flink.k8s.enums.FlinkJobState */
public static FlinkAppState fromK8sFlinkJobState(Enumeration.Value
flinkJobState) {
- if (FlinkJobState.K8S_INITIALIZING().equals(flinkJobState)) {
- return INITIALIZING;
- } else {
- return of(flinkJobState.toString());
- }
+ return of(flinkJobState.toString());
}
/** covert to org.apache.streampark.flink.k8s.enums.FlinkJobState */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 45dd84156..971c6d8ae 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1844,12 +1844,13 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setOptionTime(new Date());
updateById(application);
savepointService.expire(application.getId());
- // re-tracking flink job on kubernetes and logging exception
+ // delete deployment
if (application.isKubernetesModeJob()) {
- TrackId id = k8sWatcherWrapper.toTrackId(application);
- flinkK8sWatcher.doWatching(id);
- } else {
- FlinkAppHttpWatcher.unWatching(application.getId());
+ try {
+ KubernetesDeploymentHelper.delete(application.getK8sNamespace(),
application.getJobName());
+ } catch (Exception e) {
+ log.error("job abort failed!", e);
+ }
}
// kill application
if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
index fcd4e88e0..93c817d90 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
@@ -22,8 +22,6 @@ object FlinkJobState extends Enumeration {
// flink job has been submit by the streampark.
val STARTING,
- // flink k8s resources are being initialized.
- K8S_INITIALIZING,
// lost track of flink job temporarily.
SILENT,
// flink job has terminated positively (maybe FINISHED or CANCELED)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 59f2e37a6..ec0aa6a82 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -323,20 +323,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
val isConnection = KubernetesDeploymentHelper.checkConnection()
if (deployExists) {
- val deployError = KubernetesDeploymentHelper.isDeploymentError(
- trackId.namespace,
- trackId.clusterId
- )
- if (!deployError) {
- logger.info("Task Enter the initialization process.")
- FlinkJobState.K8S_INITIALIZING
- } else if (isConnection) {
- logger.info("Enter the task failure deletion process.")
- KubernetesDeploymentHelper.watchPodTerminatedLog(
+ if (isConnection) {
+ logger.info("Enter the task starting process.")
+ KubernetesDeploymentHelper.watchDeploymentLog(
trackId.namespace,
trackId.clusterId,
trackId.jobId)
- FlinkJobState.FAILED
+ FlinkJobState.STARTING
} else {
inferFromPreCache(latest)
}