This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch release-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/release-2.1.5 by this push:
new 1634ee5f1 [Improve] flink job-status bug fixed.
1634ee5f1 is described below
commit 1634ee5f16ecfa959d32d576b6cb3b5c63dbefe4
Author: benjobs <[email protected]>
AuthorDate: Sat Dec 7 20:01:33 2024 +0800
[Improve] flink job-status bug fixed.
---
.../helper/KubernetesDeploymentHelper.scala | 8 ++--
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 49 ++++++++++++----------
2 files changed, 33 insertions(+), 24 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index e28922002..c359ac9a0 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -29,7 +29,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient
import java.io.File
import scala.collection.JavaConversions._
-import scala.util.{Success, Try}
+import scala.util.{Failure, Success, Try}
object KubernetesDeploymentHelper extends Logger {
@@ -91,12 +91,14 @@ object KubernetesDeploymentHelper extends Logger {
deleteConfigMap(nameSpace, deploymentName)
}
- def checkConnection(): Boolean = {
+ def checkConnection(e: Throwable => Unit): Boolean = {
Try(new DefaultKubernetesClient) match {
case Success(client) =>
client.close()
true
- case _ => false
+ case Failure(exception) =>
+ e(exception)
+ false
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index aa82962f3..2c1b8cf50 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,7 +17,6 @@
package org.apache.streampark.flink.kubernetes.watcher
-import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState,
FlinkK8sExecuteMode}
@@ -168,7 +167,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
*/
def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
touchSessionAllJob(trackId)
- .find(id => id._1.jobId == trackId.jobId && id._2.jobState !=
FlinkJobState.SILENT)
+ .find(id => id._1.jobId == trackId.jobId && id._2.jobState !=
FlinkJobState.LOST)
.map(_._2)
.orElse(inferState(trackId))
}
@@ -214,12 +213,11 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
private[this] def updateState(trackId: TrackId, jobState: JobStatusCV): Unit
= {
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
+ // put job status to cache
+ watchController.jobStatuses.put(trackId, jobState)
if (jobState.diff(latest)) {
- // put job status to cache
- watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
watchController.trackIds.update(trackId)
-
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
}
@@ -244,21 +242,23 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
lazy val pollEmitTime = System.currentTimeMillis
val preCache = watchController.jobStatuses.get(id)
val state = inferFromPreCache(preCache)
- val nonFirstSilent = state == FlinkJobState.SILENT &&
+
+ val firstSilent = state == FlinkJobState.SILENT &&
preCache != null &&
- preCache.jobState == FlinkJobState.SILENT
- val jobState = if (nonFirstSilent) {
+ preCache.jobState != FlinkJobState.SILENT
+
+ val jobState = if (firstSilent) {
JobStatusCV(
- jobState = state,
+ jobState = FlinkJobState.LOST,
jobId = id.jobId,
- pollEmitTime = preCache.pollEmitTime,
- pollAckTime = preCache.pollAckTime)
+ pollEmitTime = pollEmitTime,
+ pollAckTime = System.currentTimeMillis)
} else {
JobStatusCV(
- jobState = state,
+ jobState = FlinkJobState.LOST,
jobId = id.jobId,
- pollEmitTime = pollEmitTime,
- pollAckTime = System.currentTimeMillis)
+ pollEmitTime = preCache.pollEmitTime,
+ pollAckTime = preCache.pollAckTime)
}
Option(jobState)
}
@@ -315,10 +315,16 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
val jobState = trackId match {
case id if watchController.canceling.has(id) =>
logger.info(s"trackId ${trackId.toString} is canceling")
- if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED
+ if (deployExists) {
+ FlinkJobState.CANCELLING
+ } else {
+ FlinkJobState.CANCELED
+ }
case _ =>
- val isConnection = KubernetesDeploymentHelper.checkConnection()
-
+ val isConnection = KubernetesDeploymentHelper.checkConnection(
+ e => {
+ logger.warn("connect Kubernetes client failed. ", e)
+ })
if (deployExists) {
if (isConnection) {
logger.info("Enter the task starting process.")
@@ -339,14 +345,15 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
}
val jobStatusCV = JobStatusCV(
- jobState = jobState,
+ jobState = if (jobState == FlinkJobState.SILENT) FlinkJobState.LOST else
jobState,
jobId = trackId.jobId,
pollEmitTime = pollEmitTime,
- pollAckTime = System.currentTimeMillis)
+ pollAckTime = System.currentTimeMillis
+ )
- if (
+ val flag =
jobState == FlinkJobState.SILENT && latest != null && latest.jobState ==
FlinkJobState.SILENT
- ) {
+ if (flag) {
Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime =
latest.pollAckTime))
} else {
Some(jobStatusCV)