This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 47cc9fe43 [Bug] k8s job state monitor bug fixed (#3458)
47cc9fe43 is described below
commit 47cc9fe43a2d5e462382faf3358bdf88957efe57
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 6 18:27:29 2024 +0800
[Bug] k8s job state monitor bug fixed (#3458)
* [Bug] k8s job state monitor bug fixed
---------
Co-authored-by: benjobs <[email protected]>
---
.../org/apache/streampark/common/util/Utils.scala | 7 ++
.../console/core/service/ApplicationService.java | 2 +
.../core/service/impl/ApplicationServiceImpl.java | 88 +++++++++++++---------
.../console/core/task/FlinkK8sWatcherWrapper.java | 19 ++---
.../flink/kubernetes/KubernetesRetriever.scala | 6 +-
.../helper/KubernetesDeploymentHelper.scala | 44 +++++------
.../flink/kubernetes/model/TrackId.scala | 2 +
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 45 ++++++-----
8 files changed, 124 insertions(+), 89 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index d2047ced5..3e35c388b 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -97,6 +97,13 @@ object Utils extends Logger {
new JarInputStream(new BufferedInputStream(new
FileInputStream(jarFile))).getManifest
}
+ def getJarManClass(jarFile: File): String = {
+ val manifest = getJarManifest(jarFile)
+ val mainAttr = manifest.getMainAttributes
+ Option(mainAttr.getValue("Main-Class"))
+ .getOrElse(Option(mainAttr.getValue("program-class")).orNull)
+ }
+
def copyProperties(original: Properties, target: Properties): Unit =
original.foreach(x => target.put(x._1, x._2))
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 9193cefa3..5ffa7a02e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -86,6 +86,8 @@ public interface ApplicationService extends
IService<Application> {
boolean checkAlter(Application application);
+ Map<String, String> getRumtimeConfig(Long id);
+
void updateRelease(Application application);
List<Application> getByProjectId(Long id);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 49f7a32f1..c58fcd5f2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -136,6 +136,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
@@ -152,7 +153,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.jar.Manifest;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -403,11 +403,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// 8) remove app
removeApp(application);
- if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.unWatching(toTrackId(application));
- } else {
- FlinkRESTAPIWatcher.unWatching(paramApp.getId());
- }
return true;
}
@@ -460,6 +455,23 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return cancelUserId != -1 && cancelUserId != appUserId;
}
+ @Override
+ public Map<String, String> getRumtimeConfig(Long id) {
+ Application application = getById(id);
+ if (application != null && application.getVersionId() != null) {
+ FlinkEnv flinkEnv =
flinkEnvService.getByIdOrDefault(application.getVersionId());
+ if (flinkEnv != null) {
+ File yaml = new
File(flinkEnv.getFlinkHome().concat("/conf/flink-conf.yaml"));
+ Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(yaml);
+ Map<String, String> dynamicConf =
+
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
+ config.putAll(dynamicConf);
+ return config;
+ }
+ }
+ return Collections.emptyMap();
+ }
+
private void removeApp(Application application) {
Long appId = application.getId();
removeById(appId);
@@ -695,7 +707,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
- &&
k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
+ && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) {
return AppExistsState.IN_KUBERNETES;
}
}
@@ -1135,10 +1147,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (isKubernetesApp(application)) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(),
application.getJobId());
- KubernetesDeploymentHelper.deleteTaskDeployment(
- application.getK8sNamespace(), application.getJobName());
- KubernetesDeploymentHelper.deleteTaskConfigMap(
- application.getK8sNamespace(), application.getJobName());
+ KubernetesDeploymentHelper.delete(application.getK8sNamespace(),
application.getJobName());
}
if (startFuture != null) {
startFuture.cancel(true);
@@ -1217,8 +1226,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
project.getDistHome().getAbsolutePath().concat("/").concat(application.getModule());
jarFile = new File(modulePath, application.getJar());
}
- Manifest manifest = Utils.getJarManifest(jarFile);
- return manifest.getMainAttributes().getValue("Main-Class");
+ return Utils.getJarManClass(jarFile);
}
@Override
@@ -1322,6 +1330,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
cancelFutureMap.put(application.getId(), cancelFuture);
+ TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
+
cancelFuture.whenComplete(
(cancelResponse, throwable) -> {
cancelFutureMap.remove(application.getId());
@@ -1345,9 +1355,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
- TrackId id = toTrackId(application);
- k8SFlinkTrackMonitor.unWatching(id);
- k8SFlinkTrackMonitor.doWatching(id);
+ KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
+ k8SFlinkTrackMonitor.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
@@ -1373,7 +1382,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
if (isKubernetesApp(application)) {
- k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+ KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
+ k8SFlinkTrackMonitor.unWatching(trackId);
}
});
}
@@ -1389,24 +1399,22 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
final URI uri = URI.create(savepointPath);
final String scheme = uri.getScheme();
final String pathPart = uri.getPath();
- String error = null;
if (scheme == null) {
- error =
- "This state.savepoints.dir value "
- + savepointPath
- + " scheme (hdfs://, file://, etc) of is null. Please specify
the file system scheme explicitly in the URI.";
- } else if (pathPart == null) {
- error =
- "This state.savepoints.dir value "
- + savepointPath
- + " path part to store the checkpoint data in is null. Please
specify a directory path for the checkpoint data.";
- } else if (pathPart.isEmpty() || "/".equals(pathPart)) {
- error =
- "This state.savepoints.dir value "
- + savepointPath
- + " Cannot use the root directory for checkpoints.";
+ return "This state.savepoints.dir value "
+ + savepointPath
+ + " scheme (hdfs://, file://, etc) of is null. Please specify the
file system scheme explicitly in the URI.";
+ }
+ if (pathPart == null) {
+ return "This state.savepoints.dir value "
+ + savepointPath
+ + " path part to store the checkpoint data in is null. Please
specify a directory path for the checkpoint data.";
}
- return error;
+ if (pathPart.isEmpty() || "/".equals(pathPart)) {
+ return "This state.savepoints.dir value "
+ + savepointPath
+ + " Cannot use the root directory for checkpoints.";
+ }
+ return null;
} else {
return "When custom savepoint is not set, state.savepoints.dir needs to
be set in properties or flink-conf.yaml of application";
}
@@ -1552,6 +1560,14 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}
+ TrackId trackId;
+ if (isKubernetesApp(application)) {
+ trackId = toTrackId(application);
+ KubernetesDeploymentHelper.delete(trackId.namespace(),
trackId.clusterId());
+ } else {
+ trackId = null;
+ }
+
KubernetesSubmitParam kubernetesSubmitParam =
new KubernetesSubmitParam(
application.getClusterId(),
@@ -1613,7 +1629,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
if (isKubernetesApp(app)) {
- k8SFlinkTrackMonitor.unWatching(toTrackId(app));
+ k8SFlinkTrackMonitor.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(appParam.getId());
}
@@ -1649,7 +1665,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// if start completed, will be added task to tracking queue
if (isKubernetesApp(application)) {
application.setRelease(ReleaseState.DONE.get());
- k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ k8SFlinkTrackMonitor.doWatching(trackId);
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
@@ -1748,7 +1764,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
- k8SFlinkTrackMonitor.unWatching(id);
+ KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId());
k8SFlinkTrackMonitor.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index af3b816d5..d54580032 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -23,6 +23,7 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
+import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
import org.apache.streampark.flink.kubernetes.model.TrackId;
@@ -101,17 +102,16 @@ public class FlinkK8sWatcherWrapper {
if (CollectionUtils.isEmpty(k8sApplication)) {
return Lists.newArrayList();
}
- // correct corrupted data
- List<Application> correctApps =
- k8sApplication.stream()
- .filter(app -> !Bridge.toTrackId(app).isLegal())
- .collect(Collectors.toList());
- if (CollectionUtils.isNotEmpty(correctApps)) {
- applicationService.saveOrUpdateBatch(correctApps);
- }
// filter out the application that should be tracking
return k8sApplication.stream()
- .filter(app ->
!FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
+ .filter(
+ app -> {
+ boolean isEndState =
+
FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()));
+ boolean deploymentExists =
+
KubernetesRetriever.isDeploymentExists(app.getK8sNamespace(),
app.getClusterId());
+ return !isEndState || deploymentExists;
+ })
.map(Bridge::toTrackId)
.collect(Collectors.toList());
}
@@ -121,6 +121,7 @@ public class FlinkK8sWatcherWrapper {
// covert Application to TrackId
public static TrackId toTrackId(@Nonnull Application app) {
+
Enumeration.Value mode =
FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
return TrackId.onApplication(
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 71ee9d6de..79ba51812 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -63,7 +63,7 @@ object KubernetesRetriever extends Logger {
private val clusterClientServiceLoader = new
DefaultClusterClientServiceLoader()
/** get new flink cluster client of kubernetes mode */
- def newFinkClusterClient(
+ private def newFinkClusterClient(
clusterId: String,
@Nullable namespace: String,
executeMode: FlinkK8sExecuteMode.Value): Option[ClusterClient[String]] =
{
@@ -110,7 +110,7 @@ object KubernetesRetriever extends Logger {
* @param namespace
* deployment namespace
*/
- def isDeploymentExists(name: String, namespace: String): Boolean = {
+ def isDeploymentExists(namespace: String, deploymentName: String): Boolean =
{
using(KubernetesRetriever.newK8sClient()) {
client =>
client
@@ -121,7 +121,7 @@ object KubernetesRetriever extends Logger {
.list()
.getItems
.asScala
- .exists(e => e.getMetadata.getName == name)
+ .exists(_.getMetadata.getName == deploymentName)
}(_ => false)
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 5495814f2..590cd806d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -55,7 +55,7 @@ object KubernetesDeploymentHelper extends Logger {
}
}
- def getDeploymentStatusChanges(nameSpace: String, deploymentName: String):
Boolean = {
+ def isDeploymentError(nameSpace: String, deploymentName: String): Boolean = {
Try {
val pods = getPods(nameSpace, deploymentName)
val podStatus = pods.head.getStatus
@@ -68,12 +68,7 @@ object KubernetesDeploymentHelper extends Logger {
}.getOrElse(true)
}
- def getTheNumberOfTaskDeploymentRetries(nameSpace: String, deploymentName:
String): Integer = {
- val pods = getPods(nameSpace, deploymentName)
- pods.head.getStatus.getContainerStatuses.head.getRestartCount
- }
-
- def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean
= {
+ private[this] def deleteDeployment(nameSpace: String, deploymentName:
String): Boolean = {
using(KubernetesRetriever.newK8sClient()) {
client =>
Try {
@@ -86,7 +81,26 @@ object KubernetesDeploymentHelper extends Logger {
}
}
- def isTheK8sConnectionNormal(): Boolean = {
+ private[this] def deleteConfigMap(nameSpace: String, deploymentName:
String): Boolean = {
+ using(KubernetesRetriever.newK8sClient()) {
+ client =>
+ Try {
+ val r = client
+ .configMaps()
+ .inNamespace(nameSpace)
+ .withLabel("app", deploymentName)
+ .delete
+ Boolean.unbox(r)
+ }.getOrElse(false)
+ }
+ }
+
+ def delete(nameSpace: String, deploymentName: String): Unit = {
+ deleteDeployment(nameSpace, deploymentName)
+ deleteConfigMap(nameSpace, deploymentName)
+ }
+
+ def checkConnection(): Boolean = {
Try(new DefaultKubernetesClient) match {
case Success(client) =>
client.close()
@@ -125,20 +139,6 @@ object KubernetesDeploymentHelper extends Logger {
}(error => throw error)
}
- def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean
= {
- using(KubernetesRetriever.newK8sClient()) {
- client =>
- Try {
- val r = client
- .configMaps()
- .inNamespace(nameSpace)
- .withLabel("app", deploymentName)
- .delete
- Boolean.unbox(r)
- }.getOrElse(false)
- }
- }
-
private[kubernetes] def getJobLog(jobId: String): String = {
val tmpPath = SystemPropertyUtils.getTmpdir()
s"$tmpPath/$jobId.log"
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index fba47eee6..e280122c8 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -20,6 +20,8 @@ package org.apache.streampark.flink.kubernetes.model
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import java.lang.{Boolean => JavaBool}
+
import scala.util.Try
/** tracking identifier for flink on kubernetes */
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 55f62f07f..2879f17bb 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -125,7 +125,12 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
watchController.trackIds.update(trackId)
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
}
- if (FlinkJobState.isEndState(jobState.jobState)) {
+
+ val deployExists = KubernetesRetriever.isDeploymentExists(
+ trackId.namespace,
+ trackId.clusterId
+ )
+ if (FlinkJobState.isEndState(jobState.jobState) &&
!deployExists) {
// remove trackId from cache of job that needs to be untracked
watchController.unWatching(trackId)
if (trackId.executeMode == APPLICATION) {
@@ -213,9 +218,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
} else {
jobDetails.map {
d =>
- TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) ->
d.toJobStatusCV(
- pollEmitTime,
- System.currentTimeMillis)
+ TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d
+ .toJobStatusCV(pollEmitTime, System.currentTimeMillis)
}
}
}
@@ -284,21 +288,27 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
// infer from k8s deployment and event
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
- logger.info(
- s"Query the local cache
result:${watchController.canceling.has(trackId).toString},trackId
${trackId.toString}.")
- val jobState = {
- if (watchController.canceling.has(trackId)) FlinkJobState.CANCELED
- else {
+
+ val jobState = trackId match {
+ case id if watchController.canceling.has(id) =>
+ logger.info(s"trackId ${trackId.toString} is canceling")
+ FlinkJobState.CANCELED
+ case _ =>
// whether deployment exists on kubernetes cluster
- val isDeployExists =
- KubernetesRetriever.isDeploymentExists(trackId.clusterId,
trackId.namespace)
- val deployStateOfTheError =
KubernetesDeploymentHelper.getDeploymentStatusChanges(
+ val deployExists = KubernetesRetriever.isDeploymentExists(
trackId.namespace,
- trackId.clusterId)
- val isConnection =
KubernetesDeploymentHelper.isTheK8sConnectionNormal()
+ trackId.clusterId
+ )
- if (isDeployExists) {
- if (!deployStateOfTheError) {
+ val deployError = KubernetesDeploymentHelper.isDeploymentError(
+ trackId.namespace,
+ trackId.clusterId
+ )
+
+ val isConnection = KubernetesDeploymentHelper.checkConnection()
+
+ if (deployExists) {
+ if (!deployError) {
logger.info("Task Enter the initialization process.")
FlinkJobState.K8S_INITIALIZING
} else if (isConnection) {
@@ -307,7 +317,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
trackId.namespace,
trackId.clusterId,
trackId.jobId)
- KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace,
trackId.clusterId)
FlinkJobState.FAILED
} else {
inferSilentOrLostFromPreCache(latest)
@@ -318,8 +327,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
} else {
inferSilentOrLostFromPreCache(latest)
}
-
- }
}
val jobStatusCV = JobStatusCV(