This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new fcfa54bed Fix the issue where the state sometimes remains as
"canceled" continuously. (#4031)
fcfa54bed is described below
commit fcfa54bed14ea92c27b64a744898a00af0175c79
Author: Darcy <[email protected]>
AuthorDate: Wed Sep 4 18:23:21 2024 +0800
Fix the issue where the state sometimes remains as "canceled" continuously.
(#4031)
* Fix the issue where the state sometimes remains as "canceled"
continuously.
* Remove the watchController.trackIds.invalidate statement; this method
should be called in the unwatch function.
---
.../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala | 15 ++++++---------
1 file changed, 6 insertions(+), 9 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index ec0aa6a82..aa82962f3 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -305,21 +305,18 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
*/
private def inferStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
pollEmitTime: Long): Option[JobStatusCV] = {
-
// infer from k8s deployment and event
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
+ // whether deployment exists on kubernetes cluster
+ val deployExists = KubernetesRetriever.isDeploymentExists(
+ trackId.namespace,
+ trackId.clusterId
+ )
val jobState = trackId match {
case id if watchController.canceling.has(id) =>
logger.info(s"trackId ${trackId.toString} is canceling")
- watchController.trackIds.invalidate(id)
- FlinkJobState.CANCELED
+ if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED
case _ =>
- // whether deployment exists on kubernetes cluster
- val deployExists = KubernetesRetriever.isDeploymentExists(
- trackId.namespace,
- trackId.clusterId
- )
-
val isConnection = KubernetesDeploymentHelper.checkConnection()
if (deployExists) {