This is an automated email from the ASF dual-hosted git repository.

rarexixi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4169e703c Spark k8s operator task Added status acquisition (#4889)
4169e703c is described below

commit 4169e703c427842d5e932b9375423d74017465ad
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Sep 1 19:09:53 2023 +0800

    Spark k8s operator task Added status acquisition (#4889)
    
    * Spark k8s operator task Added status acquisition
    
    * spark The obtaining status of the k8s operator task is changed to k8s 
list-watch
    
    * spark The obtaining status of the k8s operator task is changed to k8s 
list-watch
---
 ...KubernetesOperatorClusterDescriptorAdapter.java | 58 ++++++++++++++--------
 1 file changed, 36 insertions(+), 22 deletions(-)

diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
index fa6236600..3ea27b394 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
@@ -38,6 +38,9 @@ import 
io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition
 import 
io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList;
 import io.fabric8.kubernetes.client.CustomResource;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.MixedOperation;
 import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
 import io.fabric8.kubernetes.client.dsl.Resource;
 import org.slf4j.Logger;
@@ -152,32 +155,44 @@ public class KubernetesOperatorClusterDescriptorAdapter 
extends ClusterDescripto
   }
 
   public boolean initJobId() {
-    SparkApplicationStatus sparkApplicationStatus = 
getKubernetesOperatorState();
-
-    if (Objects.nonNull(sparkApplicationStatus)) {
-      this.applicationId = sparkApplicationStatus.getSparkApplicationId();
-      this.jobState =
-          kubernetesOperatorStateConvertSparkState(
-              sparkApplicationStatus.getApplicationState().getState());
+    try {
+      getKubernetesOperatorState();
+    } catch (Exception e) {
+      try {
+        // Prevent watch interruption due to network interruption.Restart 
Watcher.
+        Thread.sleep(5000);
+        getKubernetesOperatorState();
+      } catch (InterruptedException interruptedException) {
+        logger.error("Use k8s watch obtain the status failed");
+      }
     }
-
     // When the job is not finished, the appId is monitored; otherwise, the 
status is
     // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
     return null != getApplicationId() || (jobState != null && 
jobState.isFinal());
   }
 
-  private SparkApplicationStatus getKubernetesOperatorState() {
-    List<SparkApplication> sparkApplicationList =
-        getSparkApplicationClient(client).list().getItems();
-    if (CollectionUtils.isNotEmpty(sparkApplicationList)) {
-      for (SparkApplication sparkApplication : sparkApplicationList) {
-        if 
(sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace())
-            && 
sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName())) 
{
-          return sparkApplication.getStatus();
-        }
-      }
-    }
-    return null;
+  private void getKubernetesOperatorState() {
+    getSparkApplicationClient(client)
+        .inNamespace(this.sparkConfig.getK8sNamespace())
+        .withName(this.sparkConfig.getAppName())
+        .watch(
+            new Watcher<SparkApplication>() {
+              @Override
+              public void eventReceived(Action action, SparkApplication 
sparkApplication) {
+                // todo get status
+                applicationId = 
sparkApplication.getStatus().getSparkApplicationId();
+                jobState =
+                    kubernetesOperatorStateConvertSparkState(
+                        
sparkApplication.getStatus().getApplicationState().getState());
+              }
+
+              @Override
+              public void onClose(WatcherException e) {
+                // Invoked when the watcher closes due to an Exception.Restart 
Watcher.
+                logger.error("Use k8s watch obtain the status failed", e);
+                getKubernetesOperatorState();
+              }
+            });
   }
 
   public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String 
kubernetesState) {
@@ -216,8 +231,7 @@ public class KubernetesOperatorClusterDescriptorAdapter 
extends ClusterDescripto
     client.close();
   }
 
-  public static NonNamespaceOperation<
-          SparkApplication, SparkApplicationList, Resource<SparkApplication>>
+  public static MixedOperation<SparkApplication, SparkApplicationList, 
Resource<SparkApplication>>
       getSparkApplicationClient(KubernetesClient client) {
     return client.customResources(SparkApplication.class, 
SparkApplicationList.class);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to