This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6a5e6e4e9 [Improve] Application service minor improvement (#3015)
6a5e6e4e9 is described below

commit 6a5e6e4e97dbf5f3fc745df7b36292acc2fd4888
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 2 21:16:40 2023 -0500

    [Improve] Application service minor improvement (#3015)
    
    * [Improve] application service minor improvement
    
    * [Improve] applicationService minor improvement
    
    * minor improvement
    
    * minor improvement
    
    * minor improvement
---
 .../core/controller/ApplicationController.java     | 47 ++-----------
 .../application/ApplicationActionService.java      | 27 +++-----
 .../application/ApplicationInfoService.java        | 36 +++++-----
 .../application/ApplicationManageService.java      | 40 +++++------
 .../impl/ApplicationActionServiceImpl.java         | 49 +++++++-------
 .../impl/ApplicationInfoServiceImpl.java           | 20 +++---
 .../impl/ApplicationManageServiceImpl.java         | 77 ++++++++++++++--------
 .../service/ApplicationManageServiceITest.java     |  3 +-
 8 files changed, 138 insertions(+), 161 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index ed2a11352..048f95f2c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -26,20 +26,17 @@ import 
org.apache.streampark.console.base.exception.InternalException;
 import org.apache.streampark.console.core.annotation.ApiAccess;
 import org.apache.streampark.console.core.annotation.AppUpdated;
 import org.apache.streampark.console.core.annotation.PermissionAction;
-import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationBackUp;
 import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.enums.AppExistsState;
 import org.apache.streampark.console.core.enums.PermissionType;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ApplicationBackUpService;
 import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.ResourceService;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 
@@ -63,10 +60,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 @Tag(name = "FLINK_APPLICATION_TAG")
 @Slf4j
@@ -76,15 +70,15 @@ import java.util.stream.Collectors;
 public class ApplicationController {
 
   @Autowired private ApplicationManageService applicationManageService;
+
   @Autowired private ApplicationActionService applicationActionService;
+
   @Autowired private ApplicationInfoService applicationInfoService;
 
   @Autowired private ApplicationBackUpService backUpService;
 
   @Autowired private ApplicationLogService applicationLogService;
 
-  @Autowired private AppBuildPipeService appBuildPipeService;
-
   @Autowired private ResourceService resourceService;
 
   @Operation(summary = "Get application")
@@ -128,12 +122,8 @@ public class ApplicationController {
   @PostMapping(value = "copy")
   @RequiresPermissions("app:copy")
   public RestResponse copy(@Parameter(hidden = true) Application app) throws 
IOException {
-    Long id = applicationManageService.copy(app);
-    Map<String, String> data = new HashMap<>();
-    data.put("id", Long.toString(id));
-    return id.equals(0L)
-        ? RestResponse.success(false).data(data)
-        : RestResponse.success(true).data(data);
+    applicationManageService.copy(app);
+    return RestResponse.success();
   }
 
   @Operation(summary = "Update application")
@@ -159,34 +149,6 @@ public class ApplicationController {
   @RequiresPermissions("app:view")
   public RestResponse list(Application app, RestRequest request) {
     IPage<Application> applicationList = applicationManageService.page(app, 
request);
-    List<Application> appRecords = applicationList.getRecords();
-    List<Long> appIds = 
appRecords.stream().map(Application::getId).collect(Collectors.toList());
-    Map<Long, PipelineStatus> pipeStates = 
appBuildPipeService.listPipelineStatus(appIds);
-
-    // add building pipeline status info and app control info
-    appRecords =
-        appRecords.stream()
-            .peek(
-                e -> {
-                  if (pipeStates.containsKey(e.getId())) {
-                    e.setBuildStatus(pipeStates.get(e.getId()).getCode());
-                  }
-                })
-            .peek(
-                e -> {
-                  AppControl appControl =
-                      new AppControl()
-                          .setAllowBuild(
-                              e.getBuildStatus() == null
-                                  || 
!PipelineStatus.running.getCode().equals(e.getBuildStatus()))
-                          .setAllowStart(
-                              !e.shouldBeTrack()
-                                  && 
PipelineStatus.success.getCode().equals(e.getBuildStatus()))
-                          .setAllowStop(e.isRunning());
-                  e.setAppControl(appControl);
-                })
-            .collect(Collectors.toList());
-    applicationList.setRecords(appRecords);
     return RestResponse.success(applicationList);
   }
 
@@ -245,7 +207,6 @@ public class ApplicationController {
   @RequiresPermissions("app:start")
   public RestResponse start(@Parameter(hidden = true) Application app) {
     try {
-      applicationInfoService.checkEnv(app);
       applicationActionService.start(app, false);
       return RestResponse.success(true);
     } catch (Exception e) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
index 3e77485f8..e2936cb73 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
@@ -28,50 +28,43 @@ import com.baomidou.mybatisplus.extension.service.IService;
  */
 public interface ApplicationActionService extends IService<Application> {
 
-  /**
-   * This method is used to start the given application.
-   *
-   * @param app The application object to be started.
-   */
-  void starting(Application app);
-
   /**
    * Starts the specified application.
    *
-   * @param app The application to start.
+   * @param appParam The application to start.
    * @param auto True if the application should start automatically, False 
otherwise.
    * @throws Exception If an error occurs while starting the application.
    */
-  void start(Application app, boolean auto) throws Exception;
+  void start(Application appParam, boolean auto) throws Exception;
 
   /**
    * Restarts the given application.
    *
-   * @param application The application to restart.
+   * @param appParam The application to restart.
    * @throws Exception If an error occurs while restarting the application.
    */
-  void restart(Application application) throws Exception;
+  void restart(Application appParam) throws Exception;
 
   /**
    * Revokes access for the given application.
    *
-   * @param app The application for which access needs to be revoked.
+   * @param appParam The application for which access needs to be revoked.
    * @throws ApplicationException if an error occurs while revoking access.
    */
-  void revoke(Application app) throws ApplicationException;
+  void revoke(Application appParam) throws ApplicationException;
 
   /**
    * Cancels the given application. Throws an exception if cancellation fails.
    *
-   * @param app the application to be canceled
+   * @param appParam the application to be canceled
    * @throws Exception if cancellation fails
    */
-  void cancel(Application app) throws Exception;
+  void cancel(Application appParam) throws Exception;
 
   /**
    * Forces the given application to stop.
    *
-   * @param app the application to be stopped
+   * @param appParam the application to be stopped
    */
-  void forcedStop(Application app);
+  void forcedStop(Application appParam);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
index 093b1b0d4..57fbdef59 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
@@ -37,37 +37,37 @@ public interface ApplicationInfoService extends 
IService<Application> {
   /**
    * Maps the given application.
    *
-   * @param app The application to be mapped.
+   * @param appParam The application to be mapped.
    * @return True if the mapping was successful, false otherwise.
    */
-  boolean mapping(Application app);
+  boolean mapping(Application appParam);
 
   /**
    * Checks the environment for the given application.
    *
-   * @param app the application to check the environment for
+   * @param appParam the application to check the environment for
    * @return true if the environment is valid for the application, false 
otherwise
    * @throws ApplicationException if an error occurs while checking the 
environment
    */
-  boolean checkEnv(Application app) throws ApplicationException;
+  boolean checkEnv(Application appParam) throws ApplicationException;
 
   /**
    * Checks the savepoint path for the given application.
    *
-   * @param app the application to check the savepoint path for
+   * @param appParam the application to check the savepoint path for
    * @return the check message
    * @throws Exception if an error occurs while checking the savepoint path
    */
-  String checkSavepointPath(Application app) throws Exception;
+  String checkSavepointPath(Application appParam) throws Exception;
 
   /**
    * Checks if the given application meets the required alterations.
    *
-   * @param application The application to be checked.
+   * @param appParam The application to be checked.
    * @return True if the application meets the required alterations, false 
otherwise.
    * @throws ApplicationException If an error occurs while checking the 
alterations.
    */
-  boolean checkAlter(Application application);
+  boolean checkAlter(Application appParam);
 
   /**
    * Checks if a record exists in the database with the given team ID.
@@ -129,42 +129,42 @@ public interface ApplicationInfoService extends 
IService<Application> {
   /**
    * Gets the YARN name for the given application.
    *
-   * @param app The application for which to retrieve the YARN name.
+   * @param appParam The application for which to retrieve the YARN name.
    * @return The YARN name of the application as a String.
    */
-  String getYarnName(Application app);
+  String getYarnName(Application appParam);
 
   /**
    * Checks if the given application exists in the system.
    *
-   * @param app The application to check for existence.
+   * @param appParam The application to check for existence.
    * @return AppExistsState indicating the existence state of the application.
    */
-  AppExistsState checkExists(Application app);
+  AppExistsState checkExists(Application appParam);
 
   /**
    * Persists the metrics of the given application.
    *
-   * @param application The application which metrics need to be persisted.
+   * @param appParam The application which metrics need to be persisted.
    */
-  void persistMetrics(Application application);
+  void persistMetrics(Application appParam);
 
   /**
    * Reads the configuration for the given application and returns it as a 
String.
    *
-   * @param app The application for which the configuration needs to be read.
+   * @param appParam The application for which the configuration needs to be 
read.
    * @return The configuration for the given application as a String.
    * @throws IOException If an I/O error occurs while reading the 
configuration.
    */
-  String readConf(Application app) throws IOException;
+  String readConf(Application appParam) throws IOException;
 
   /**
    * Retrieves the main configuration value for the given Application.
    *
-   * @param application the Application object for which to fetch the main 
configuration value
+   * @param appParam the Application object for which to fetch the main 
configuration value
    * @return the main configuration value as a String
    */
-  String getMain(Application application);
+  String getMain(Application appParam);
 
   /**
    * Returns the dashboard for the specified team.
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
index 3d2c6f4fa..d55564111 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
@@ -37,83 +37,83 @@ public interface ApplicationManageService extends 
IService<Application> {
   /**
    * Retrieves a page of applications based on the provided parameters.
    *
-   * @param app The application object to be used for filtering the results.
+   * @param appParam The application object to be used for filtering the 
results.
    * @param request The REST request object containing additional parameters 
or headers.
    * @return A page of Application objects based on the provided parameters.
    */
-  IPage<Application> page(Application app, RestRequest request);
+  IPage<Application> page(Application appParam, RestRequest request);
 
   /**
    * Creates a new application.
    *
-   * @param app The application to create.
+   * @param appParam The application to create.
    * @return True if the application was successfully created, false otherwise.
    * @throws IOException If an I/O error occurs.
    */
-  boolean create(Application app) throws IOException;
+  boolean create(Application appParam) throws IOException;
 
   /**
    * Copies the given Application.
    *
-   * @param app the Application to be copied
+   * @param appParam the Application to be copied
    * @return the size of the copied Application in bytes as a Long value
    * @throws IOException if there was an error during the copy process
    */
-  Long copy(Application app) throws IOException;
+  Long copy(Application appParam) throws IOException;
 
   /**
    * Updates the given application.
    *
-   * @param app the application to be updated
+   * @param appParam the application to be updated
    * @return true if the update was successful, false otherwise
    */
-  boolean update(Application app);
+  boolean update(Application appParam);
 
   /**
    * Sets the given application to be effective.
    *
-   * @param application the application to be set effective
+   * @param appParam the application to be set effective
    */
-  void toEffective(Application application);
+  void toEffective(Application appParam);
 
   /**
    * Checks if the given application is ready to build and update.
    *
-   * @param app the application to check for readiness
+   * @param appParam the application to check for readiness
    * @return true if the application is ready to build and update, false 
otherwise
    */
-  boolean checkBuildAndUpdate(Application app);
+  boolean checkBuildAndUpdate(Application appParam);
 
   /**
    * Deletes the given Application from the system.
    *
-   * @param app The Application to be deleted.
+   * @param appParam The Application to be deleted.
    * @return True if the deletion was successful, false otherwise.
    */
-  Boolean delete(Application app);
+  Boolean delete(Application appParam);
 
   /**
    * Retrieves the Application with the specified details from the system.
    *
-   * @param app The Application object containing the details of the 
Application to retrieve.
+   * @param appParam The Application object containing the details of the 
Application to retrieve.
    * @return The Application object that matches the specified details, or 
null if no matching
    *     Application is found.
    */
-  Application getApp(Application app);
+  Application getApp(Application appParam);
 
   /**
    * Updates the release of the given application.
    *
-   * @param application The application to update the release for.
+   * @param appParam The application to update the release for.
    */
-  void updateRelease(Application application);
+  void updateRelease(Application appParam);
 
   /**
    * Cleans the application by performing necessary cleanup tasks.
    *
-   * @param app The application to clean.
+   * @param appParam The application to clean.
    */
-  void clean(Application app);
+  void clean(Application appParam);
 
   /**
    * Retrieves a list of applications by project ID.
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 128893d81..9d6926a9a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -61,6 +61,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
+import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
@@ -130,6 +131,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
   @Autowired private ApplicationBackUpService backUpService;
   @Autowired private ApplicationManageService applicationManageService;
 
+  @Autowired private ApplicationInfoService applicationInfoService;
+
   @Autowired private ApplicationConfigService configService;
 
   @Autowired private ApplicationLogService applicationLogService;
@@ -161,11 +164,11 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       new ConcurrentHashMap<>();
 
   @Override
-  public void revoke(Application appParma) throws ApplicationException {
-    Application application = getById(appParma.getId());
+  public void revoke(Application appParam) throws ApplicationException {
+    Application application = getById(appParam.getId());
     ApiAlertException.throwIfNull(
         application,
-        String.format("The application id=%s not found, revoke failed.", 
appParma.getId()));
+        String.format("The application id=%s not found, revoke failed.", 
appParam.getId()));
 
     // 1) delete files that have been published to workspace
     application.getFsOperator().delete(application.getAppHome());
@@ -188,16 +191,16 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
   }
 
   @Override
-  public void restart(Application application) throws Exception {
-    this.cancel(application);
-    this.start(application, false);
+  public void restart(Application appParam) throws Exception {
+    this.cancel(appParam);
+    this.start(appParam, false);
   }
 
   @Override
-  public void forcedStop(Application app) {
-    CompletableFuture<SubmitResponse> startFuture = 
startFutureMap.remove(app.getId());
-    CompletableFuture<CancelResponse> cancelFuture = 
cancelFutureMap.remove(app.getId());
-    Application application = this.baseMapper.getApp(app);
+  public void forcedStop(Application appParam) {
+    CompletableFuture<SubmitResponse> startFuture = 
startFutureMap.remove(appParam.getId());
+    CompletableFuture<CancelResponse> cancelFuture = 
cancelFutureMap.remove(appParam.getId());
+    Application application = this.baseMapper.getApp(appParam);
     if (isKubernetesApp(application)) {
       KubernetesDeploymentHelper.watchPodTerminatedLog(
           application.getK8sNamespace(), application.getJobName(), 
application.getJobId());
@@ -213,7 +216,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       cancelFuture.cancel(true);
     }
     if (startFuture == null && cancelFuture == null) {
-      this.updateToStopped(app);
+      this.updateToStopped(appParam);
     }
   }
 
@@ -363,23 +366,12 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
             });
   }
 
-  /**
-   * Setup task is starting (for webUI "state" display)
-   *
-   * @param application
-   */
-  @Override
-  public void starting(Application application) {
-    application.setState(FlinkAppState.STARTING.getValue());
-    application.setOptionTime(new Date());
-    updateById(application);
-  }
-
   @Override
   @Transactional(rollbackFor = {Exception.class})
   public void start(Application appParam, boolean auto) throws Exception {
     final Application application = getById(appParam.getId());
-    Utils.notNull(application);
+    ApiAlertException.throwIfNull(application, "[StreamPark] application is 
not exists.");
+
     if (!application.isCanBeStart()) {
       throw new ApiAlertException("[StreamPark] The application cannot be 
started repeatedly.");
     }
@@ -389,6 +381,13 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       throw new ApiAlertException("[StreamPark] can no found flink version");
     }
 
+    applicationInfoService.checkEnv(appParam);
+
+    // update state to starting
+    application.setState(FlinkAppState.STARTING.getValue());
+    application.setOptionTime(new Date());
+    updateById(application);
+
     // if manually started, clear the restart flag
     if (!auto) {
       application.setRestartCount(0);
@@ -399,8 +398,6 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       appParam.setSavePointed(true);
       application.setRestartCount(application.getRestartCount() + 1);
     }
-
-    starting(application);
     application.setAllowNonRestored(appParam.getAllowNonRestored());
 
     String appConf;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 8ded940f7..b26de031f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -209,14 +209,14 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
   }
 
   @Override
-  public boolean checkAlter(Application application) {
-    Long appId = application.getId();
-    FlinkAppState state = FlinkAppState.of(application.getState());
+  public boolean checkAlter(Application appParam) {
+    Long appId = appParam.getId();
+    FlinkAppState state = FlinkAppState.of(appParam.getState());
     if (!FlinkAppState.CANCELED.equals(state)) {
       return false;
     }
     long cancelUserId = FlinkHttpWatcher.getCanceledJobUserId(appId);
-    long appUserId = application.getUserId();
+    long appUserId = appParam.getUserId();
     return cancelUserId != -1 && cancelUserId != appUserId;
   }
 
@@ -432,16 +432,16 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
   }
 
   @Override
-  public String getMain(Application application) {
+  public String getMain(Application appParam) {
     File jarFile;
-    if (application.getProjectId() == null) {
-      jarFile = new File(application.getJar());
+    if (appParam.getProjectId() == null) {
+      jarFile = new File(appParam.getJar());
     } else {
       Project project = new Project();
-      project.setId(application.getProjectId());
+      project.setId(appParam.getProjectId());
       String modulePath =
-          
project.getDistHome().getAbsolutePath().concat("/").concat(application.getModule());
-      jarFile = new File(modulePath, application.getJar());
+          
project.getDistHome().getAbsolutePath().concat("/").concat(appParam.getModule());
+      jarFile = new File(modulePath, appParam.getJar());
     }
     Manifest manifest = Utils.getJarManifest(jarFile);
     return manifest.getMainAttributes().getValue("Main-Class");
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 222b8967e..eeabbaad8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -28,6 +28,7 @@ import 
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.CommonUtils;
 import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationConfig;
 import org.apache.streampark.console.core.entity.FlinkSql;
@@ -53,6 +54,7 @@ import 
org.apache.streampark.console.core.service.YarnQueueService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -80,6 +82,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
@@ -127,16 +130,16 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
   }
 
   @Override
-  public void toEffective(Application application) {
+  public void toEffective(Application appParam) {
     // set latest to Effective
-    ApplicationConfig config = configService.getLatest(application.getId());
+    ApplicationConfig config = configService.getLatest(appParam.getId());
     if (config != null) {
-      this.configService.toEffective(application.getId(), config.getId());
+      this.configService.toEffective(appParam.getId(), config.getId());
     }
-    if (application.isFlinkSqlJob()) {
-      FlinkSql flinkSql = flinkSqlService.getCandidate(application.getId(), 
null);
+    if (appParam.isFlinkSqlJob()) {
+      FlinkSql flinkSql = flinkSqlService.getCandidate(appParam.getId(), null);
       if (flinkSql != null) {
-        flinkSqlService.toEffective(application.getId(), flinkSql.getId());
+        flinkSqlService.toEffective(appParam.getId(), flinkSql.getId());
         // clean candidate
         flinkSqlService.cleanCandidate(flinkSql.getId());
       }
@@ -145,9 +148,9 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
   @Override
   @Transactional(rollbackFor = {Exception.class})
-  public Boolean delete(Application paramApp) {
+  public Boolean delete(Application appParam) {
 
-    Application application = getById(paramApp.getId());
+    Application application = getById(appParam.getId());
 
     // 1) remove flink sql
     flinkSqlService.removeApp(application.getId());
@@ -177,7 +180,7 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
     if (isKubernetesApp(application)) {
       k8SFlinkTrackMonitor.unWatching(toTrackId(application));
     } else {
-      FlinkHttpWatcher.unWatching(paramApp.getId());
+      FlinkHttpWatcher.unWatching(appParam.getId());
     }
     return true;
   }
@@ -220,6 +223,10 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
     this.baseMapper.page(page, appParam);
     List<Application> records = page.getRecords();
     long now = System.currentTimeMillis();
+
+    List<Long> appIds = 
records.stream().map(Application::getId).collect(Collectors.toList());
+    Map<Long, PipelineStatus> pipeStates = 
appBuildPipeService.listPipelineStatus(appIds);
+
     List<Application> newRecords =
         records.stream()
             .peek(
@@ -236,6 +243,24 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
                       record.setDuration(now - 
record.getStartTime().getTime());
                     }
                   }
+                  if (pipeStates.containsKey(record.getId())) {
+                    
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
+                  }
+
+                  AppControl appControl =
+                      new AppControl()
+                          .setAllowBuild(
+                              record.getBuildStatus() == null
+                                  || !PipelineStatus.running
+                                      .getCode()
+                                      .equals(record.getBuildStatus()))
+                          .setAllowStart(
+                              !record.shouldBeTrack()
+                                  && PipelineStatus.success
+                                      .getCode()
+                                      .equals(record.getBuildStatus()))
+                          .setAllowStop(record.isRunning());
+                  record.setAppControl(appControl);
                 })
             .collect(Collectors.toList());
     page.setRecords(newRecords);
@@ -587,13 +612,13 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
   }
 
   @Override
-  public void updateRelease(Application application) {
+  public void updateRelease(Application appParam) {
     LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
-    updateWrapper.eq(Application::getId, application.getId());
-    updateWrapper.set(Application::getRelease, application.getRelease());
-    updateWrapper.set(Application::getBuild, application.getBuild());
-    if (application.getOptionState() != null) {
-      updateWrapper.set(Application::getOptionState, 
application.getOptionState());
+    updateWrapper.eq(Application::getId, appParam.getId());
+    updateWrapper.set(Application::getRelease, appParam.getRelease());
+    updateWrapper.set(Application::getBuild, appParam.getBuild());
+    if (appParam.getOptionState() != null) {
+      updateWrapper.set(Application::getOptionState, 
appParam.getOptionState());
     }
     this.update(updateWrapper);
   }
@@ -623,12 +648,12 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
   }
 
   @Override
-  public boolean checkBuildAndUpdate(Application application) {
-    boolean build = application.getBuild();
+  public boolean checkBuildAndUpdate(Application appParam) {
+    boolean build = appParam.getBuild();
     if (!build) {
       LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
-      updateWrapper.eq(Application::getId, application.getId());
-      if (application.isRunning()) {
+      updateWrapper.eq(Application::getId, appParam.getId());
+      if (appParam.isRunning()) {
         updateWrapper.set(Application::getRelease, 
ReleaseState.NEED_RESTART.get());
       } else {
         updateWrapper.set(Application::getRelease, ReleaseState.DONE.get());
@@ -637,18 +662,18 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
       this.update(updateWrapper);
 
       // backup
-      if (application.isFlinkSqlJob()) {
-        FlinkSql newFlinkSql = 
flinkSqlService.getCandidate(application.getId(), CandidateType.NEW);
-        if (!application.isNeedRollback() && newFlinkSql != null) {
-          backUpService.backup(application, newFlinkSql);
+      if (appParam.isFlinkSqlJob()) {
+        FlinkSql newFlinkSql = flinkSqlService.getCandidate(appParam.getId(), 
CandidateType.NEW);
+        if (!appParam.isNeedRollback() && newFlinkSql != null) {
+          backUpService.backup(appParam, newFlinkSql);
         }
       }
 
       // If the current task is not running, or the task has just been added,
       // directly set the candidate version to the official version
-      FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), 
false);
-      if (!application.isRunning() || flinkSql == null) {
-        this.toEffective(application);
+      FlinkSql flinkSql = flinkSqlService.getEffective(appParam.getId(), 
false);
+      if (!appParam.isRunning() || flinkSql == null) {
+        this.toEffective(appParam);
       }
     }
     return build;
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
index d32cd009d..759fc9dac 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
@@ -49,7 +49,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Integration test for {@link
- * org.apache.streampark.console.core.service.impl.ApplicationServiceImpl}.
+ * 
org.apache.streampark.console.core.service.application.ApplicationManageService}.
  */
 class ApplicationManageServiceITest extends SpringIntegrationTestBase {
 
@@ -57,6 +57,7 @@ class ApplicationManageServiceITest extends 
SpringIntegrationTestBase {
       
FlinkStandaloneSessionCluster.builder().slotsNumPerTm(4).slf4jLogConsumer(null).build();
 
   @Autowired private ApplicationManageService applicationManageService;
+
   @Autowired private ApplicationActionService applicationActionService;
 
   @Autowired private FlinkClusterService clusterService;


Reply via email to