This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 97dfe6b2f [Improve] custom_code build pipeline bug fixed. (#3484)
97dfe6b2f is described below

commit 97dfe6b2fdcc50960ed5fe377a05e3fc0309a204
Author: benjobs <[email protected]>
AuthorDate: Wed Jan 10 20:56:17 2024 +0800

    [Improve] custom_code build pipeline bug fixed. (#3484)
    
    Co-authored-by: benjobs <[email protected]>
---
 .../core/service/impl/AppBuildPipeServiceImpl.java | 25 ++++++++---------
 .../core/service/impl/ApplicationServiceImpl.java  | 24 ++++++++---------
 .../streampark-console-webapp/package.json         |  2 +-
 .../src/views/base/login/Login.vue                 |  2 +-
 .../flink/kubernetes/DefaultFlinkK8sWatcher.scala  | 12 ++++-----
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 31 +++++++++++-----------
 6 files changed, 49 insertions(+), 47 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index a002a8585..6834d940d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.FileUtils;
@@ -378,7 +377,7 @@ public class AppBuildPipeServiceImpl
 
     FsOperator localFS = FsOperator.lfs();
     // 1. copy jar to local upload dir
-    if (app.isFlinkSqlJob() || app.isApacheFlinkCustomCodeJob()) {
+    if (app.isFlinkSqlJob() || app.isCustomCodeJob()) {
       if (!app.getMavenDependency().getJar().isEmpty()) {
         for (String jar : app.getMavenDependency().getJar()) {
           File localJar = new File(WebUtils.getAppTempDir(), jar);
@@ -393,14 +392,14 @@ public class AppBuildPipeServiceImpl
       }
     }
 
-    if (app.isApacheFlinkCustomCodeJob()) {
+    if (app.isCustomCodeJob()) {
       // customCode upload jar to appHome...
       FsOperator fsOperator = app.getFsOperator();
       ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom());
 
       File userJar;
       if (resourceFrom == ResourceFrom.CICD) {
-        userJar = getAppDistJar(app);
+        userJar = getCustomCodeAppDistJar(app);
       } else if (resourceFrom == ResourceFrom.UPLOAD) {
         userJar = new File(WebUtils.getAppTempDir(), app.getJar());
       } else {
@@ -484,15 +483,17 @@ public class AppBuildPipeServiceImpl
     }
   }
 
-  private File getAppDistJar(Application app) {
-    if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) {
-      return new File(app.getDistHome() + "/lib", 
app.getModule().concat(".jar"));
-    }
-    if (app.getApplicationType() == ApplicationType.APACHE_FLINK) {
-      return new File(app.getDistHome(), app.getJar());
+  private File getCustomCodeAppDistJar(Application app) {
+    switch (app.getApplicationType()) {
+      case APACHE_FLINK:
+        return new File(app.getDistHome(), app.getJar());
+      case STREAMPARK_FLINK:
+        String userJar = String.format("%s/lib/%s.jar", app.getDistHome(), 
app.getModule());
+        return new File(userJar);
+      default:
+        throw new IllegalArgumentException(
+            "[StreamPark] unsupported ApplicationType of custom code: " + 
app.getApplicationType());
     }
-    throw new IllegalArgumentException(
-        "[StreamPark] unsupported ApplicationType of custom code: " + 
app.getApplicationType());
   }
 
   /** copy from {@link ApplicationServiceImpl#start(Application, boolean)} */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index ed55c27cc..869e79afd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -210,7 +210,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   @Autowired private EnvInitializer envInitializer;
 
-  @Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
+  @Autowired private FlinkK8sWatcher flinkK8sWatcher;
 
   @Autowired private AppBuildPipeService appBuildPipeService;
 
@@ -282,7 +282,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     }
 
     // merge metrics from flink kubernetes cluster
-    FlinkMetricCV k8sMetric = 
k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
+    FlinkMetricCV k8sMetric = 
flinkK8sWatcher.getAccGroupMetrics(teamId.toString());
     if (k8sMetric != null) {
       totalJmMemory += k8sMetric.totalJmMemory();
       totalTmMemory += k8sMetric.totalTmMemory();
@@ -518,7 +518,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                   // in time.
                   if (isKubernetesApp(record)) {
                     // set duration
-                    String restUrl = 
k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record));
+                    String restUrl = 
flinkK8sWatcher.getRemoteRestUrl(toTrackId(record));
                     record.setFlinkRestUrl(restUrl);
                     if (record.getTracking() == 1
                         && record.getStartTime() != null
@@ -707,7 +707,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         }
         // check whether clusterId, namespace, jobId on kubernetes
         else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
-            && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) {
+            && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(app))) {
           return AppExistsState.IN_KUBERNETES;
         }
       }
@@ -723,7 +723,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       }
       // check whether clusterId, namespace, jobId on kubernetes
       else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
-          && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) 
{
+          && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(appParam))) {
         return AppExistsState.IN_KUBERNETES;
       }
     }
@@ -1196,7 +1196,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     }
     // add flink web url info for k8s-mode
     if (isKubernetesApp(application)) {
-      String restUrl = 
k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(application));
+      String restUrl = 
flinkK8sWatcher.getRemoteRestUrl(toTrackId(application));
       application.setFlinkRestUrl(restUrl);
 
       // set duration
@@ -1233,7 +1233,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     boolean mapping = this.baseMapper.mapping(appParam);
     Application application = getById(appParam.getId());
     if (isKubernetesApp(application)) {
-      k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+      flinkK8sWatcher.doWatching(toTrackId(application));
     } else {
       FlinkRESTAPIWatcher.doWatching(application);
     }
@@ -1354,7 +1354,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               }
               // re-tracking flink job on kubernetes and logging exception
               if (isKubernetesApp(application)) {
-                k8SFlinkTrackMonitor.unWatching(trackId);
+                flinkK8sWatcher.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(application.getId());
               }
@@ -1380,7 +1380,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           }
 
           if (isKubernetesApp(application)) {
-            k8SFlinkTrackMonitor.unWatching(trackId);
+            flinkK8sWatcher.unWatching(trackId);
           }
         });
   }
@@ -1620,7 +1620,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               app.setOptionState(OptionState.NONE.getValue());
               updateById(app);
               if (isKubernetesApp(app)) {
-                k8SFlinkTrackMonitor.unWatching(trackId);
+                flinkK8sWatcher.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(appParam.getId());
               }
@@ -1656,7 +1656,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           // if start completed, will be added task to tracking queue
           if (isKubernetesApp(application)) {
             application.setRelease(ReleaseState.DONE.get());
-            k8SFlinkTrackMonitor.doWatching(trackId);
+            flinkK8sWatcher.doWatching(trackId);
             if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
               String domainName = settingService.getIngressModeDefault();
               if (StringUtils.isNotBlank(domainName)) {
@@ -1755,7 +1755,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // re-tracking flink job on kubernetes and logging exception
     if (isKubernetesApp(application)) {
       TrackId id = toTrackId(application);
-      k8SFlinkTrackMonitor.doWatching(id);
+      flinkK8sWatcher.doWatching(id);
     } else {
       FlinkRESTAPIWatcher.unWatching(application.getId());
     }
diff --git a/streampark-console/streampark-console-webapp/package.json 
b/streampark-console/streampark-console-webapp/package.json
index 9d2d43554..8622c067f 100644
--- a/streampark-console/streampark-console-webapp/package.json
+++ b/streampark-console/streampark-console-webapp/package.json
@@ -49,7 +49,7 @@
     "@vueuse/shared": "^9.6.0",
     "@zxcvbn-ts/core": "^2.1.0",
     "ant-design-vue": "^3.2.15",
-    "axios": "^1.2.1",
+    "axios": "^1.6.5",
     "crypto-js": "^4.1.1",
     "dayjs": "^1.11.6",
     "lodash-es": "^4.17.21",
diff --git 
a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue 
b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
index 8daa7445d..2e0598e68 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
@@ -59,7 +59,7 @@
         />
       </a>
       <p class="text-light-100 pt-10px" style="border-top: 1px solid #dad7d7">
-        Copyright © 2023 The Apache Software Foundation. Apache StreamPark, 
StreamPark, and its
+        Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache 
Software Foundation. Apache StreamPark, StreamPark, and its
         feather logo are trademarks of The Apache Software Foundation.
       </p>
     </footer>
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
index e9a3b06e1..4e3d808c4 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
@@ -21,7 +21,7 @@ import 
org.apache.streampark.flink.kubernetes.enums.FlinkJobState
 import 
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, 
SESSION}
 import org.apache.streampark.flink.kubernetes.event.{BuildInEvent, 
FlinkJobStateEvent, FlinkJobStatusChangeEvent}
 import org.apache.streampark.flink.kubernetes.model._
-import org.apache.streampark.flink.kubernetes.watcher.{FlinkCheckpointWatcher, 
FlinkJobStatusWatcher, FlinkK8sEventWatcher, FlinkMetricWatcher, FlinkWatcher}
+import org.apache.streampark.flink.kubernetes.watcher._
 
 import com.google.common.eventbus.{AllowConcurrentEvents, Subscribe}
 
@@ -42,10 +42,10 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = 
FlinkTrackConfig.defaultCo
   }
 
   // remote server tracking watcher
-  val k8sEventWatcher = new FlinkK8sEventWatcher()
-  val jobStatusWatcher = new FlinkJobStatusWatcher(conf.jobStatusWatcherConf)
-  val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf)
-  val checkpointWatcher = new FlinkCheckpointWatcher(conf.metricWatcherConf)
+  private val k8sEventWatcher = new FlinkK8sEventWatcher()
+  private val jobStatusWatcher = new 
FlinkJobStatusWatcher(conf.jobStatusWatcherConf)
+  private val metricsWatcher = new FlinkMetricWatcher(conf.metricWatcherConf)
+  private val checkpointWatcher = new 
FlinkCheckpointWatcher(conf.metricWatcherConf)
 
   private[this] val allWatchers =
     Array[FlinkWatcher](k8sEventWatcher, jobStatusWatcher, metricsWatcher, 
checkpointWatcher)
@@ -118,7 +118,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = 
FlinkTrackConfig.defaultCo
     watchController.endpoints.get(trackId.toClusterKey)
 
   /** Build-in Event Listener of K8sFlinkTrackMonitor. */
-  class BuildInEventListener {
+  private class BuildInEventListener {
 
     /**
      * Watch the FlinkJobOperaEvent, then update relevant cache record and 
trigger a new
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 6d731fe83..eec41ebd6 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -31,7 +31,6 @@ import com.google.common.io.Files
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.history.FsJobArchivist
 import org.apache.hc.client5.http.fluent.Request
-import org.apache.hc.core5.util.Timeout
 import org.json4s.{DefaultFormats, JNothing, JNull}
 import org.json4s.JsonAST.JArray
 import org.json4s.jackson.JsonMethods.parse
@@ -116,9 +115,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
             case Some(jobState) =>
               val trackId = id.copy(jobId = jobState.jobId)
               val latest: JobStatusCV = 
watchController.jobStatuses.get(trackId)
-              if (
-                latest == null || latest.jobState != jobState.jobState || 
latest.jobId != jobState.jobId
-              ) {
+
+              val eventChanged = latest == null ||
+                latest.jobState != jobState.jobState ||
+                latest.jobId != jobState.jobId
+
+              if (eventChanged) {
+                logInfo(s"eventChanged.....$trackId")
                 // put job status to cache
                 watchController.jobStatuses.put(trackId, jobState)
                 // set jobId to trackIds
@@ -193,7 +196,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
           pollAckTime = System.currentTimeMillis)
       }
     }
-
     Some(jobState)
   }
 
@@ -204,7 +206,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
    * This method can be called directly from outside, without affecting the 
current cachePool
    * result.
    */
-  protected[kubernetes] def touchSessionAllJob(
+  private def touchSessionAllJob(
       @Nonnull clusterId: String,
       @Nonnull namespace: String,
       @Nonnull appId: Long,
@@ -219,8 +221,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
     } else {
       jobDetails.map {
         d =>
-          TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d
-            .toJobStatusCV(pollEmitTime, System.currentTimeMillis)
+          TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) ->
+            d.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
       }
     }
   }
@@ -238,7 +240,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
     val namespace = trackId.namespace
     val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, 
clusterId))
     if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
-      inferApplicationFlinkJobStateFromK8sEvent(trackId)
+      inferJobStateFromK8sEvent(trackId)
     } else {
       Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis))
     }
@@ -284,7 +286,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
    * Infer the current flink state from the last relevant k8s events. This 
method is only used for
    * application-mode job inference in case of a failed JM rest request.
    */
-  private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: 
TrackId)(implicit
+  private def inferJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
       pollEmitTime: Long): Option[JobStatusCV] = {
 
     // infer from k8s deployment and event
@@ -301,14 +303,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
           trackId.clusterId
         )
 
-        val deployError = KubernetesDeploymentHelper.isDeploymentError(
-          trackId.namespace,
-          trackId.clusterId
-        )
-
         val isConnection = KubernetesDeploymentHelper.checkConnection()
 
         if (deployExists) {
+          val deployError = KubernetesDeploymentHelper.isDeploymentError(
+            trackId.namespace,
+            trackId.clusterId
+          )
           if (!deployError) {
             logger.info("Task Enter the initialization process.")
             FlinkJobState.K8S_INITIALIZING

Reply via email to