This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch app
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 1360457ef85b778e54e157dd32b29fadc4f26c97
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 2 23:10:49 2023 +0800

    [Improve] application service minor improvement
---
 .../core/controller/ApplicationController.java     | 46 +---------------------
 .../impl/ApplicationActionServiceImpl.java         |  8 +++-
 .../impl/ApplicationManageServiceImpl.java         | 29 +++++++++++---
 3 files changed, 33 insertions(+), 50 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index ed2a11352..07fd8e340 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -26,7 +26,6 @@ import 
org.apache.streampark.console.base.exception.InternalException;
 import org.apache.streampark.console.core.annotation.ApiAccess;
 import org.apache.streampark.console.core.annotation.AppUpdated;
 import org.apache.streampark.console.core.annotation.PermissionAction;
-import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationBackUp;
 import org.apache.streampark.console.core.entity.ApplicationLog;
@@ -39,7 +38,6 @@ import 
org.apache.streampark.console.core.service.ResourceService;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 
@@ -63,10 +61,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 @Tag(name = "FLINK_APPLICATION_TAG")
 @Slf4j
@@ -78,13 +73,9 @@ public class ApplicationController {
   @Autowired private ApplicationManageService applicationManageService;
   @Autowired private ApplicationActionService applicationActionService;
   @Autowired private ApplicationInfoService applicationInfoService;
-
   @Autowired private ApplicationBackUpService backUpService;
-
   @Autowired private ApplicationLogService applicationLogService;
-
   @Autowired private AppBuildPipeService appBuildPipeService;
-
   @Autowired private ResourceService resourceService;
 
   @Operation(summary = "Get application")
@@ -128,12 +119,8 @@ public class ApplicationController {
   @PostMapping(value = "copy")
   @RequiresPermissions("app:copy")
   public RestResponse copy(@Parameter(hidden = true) Application app) throws 
IOException {
-    Long id = applicationManageService.copy(app);
-    Map<String, String> data = new HashMap<>();
-    data.put("id", Long.toString(id));
-    return id.equals(0L)
-        ? RestResponse.success(false).data(data)
-        : RestResponse.success(true).data(data);
+    applicationManageService.copy(app);
+    return RestResponse.success();
   }
 
   @Operation(summary = "Update application")
@@ -159,34 +146,6 @@ public class ApplicationController {
   @RequiresPermissions("app:view")
   public RestResponse list(Application app, RestRequest request) {
     IPage<Application> applicationList = applicationManageService.page(app, 
request);
-    List<Application> appRecords = applicationList.getRecords();
-    List<Long> appIds = 
appRecords.stream().map(Application::getId).collect(Collectors.toList());
-    Map<Long, PipelineStatus> pipeStates = 
appBuildPipeService.listPipelineStatus(appIds);
-
-    // add building pipeline status info and app control info
-    appRecords =
-        appRecords.stream()
-            .peek(
-                e -> {
-                  if (pipeStates.containsKey(e.getId())) {
-                    e.setBuildStatus(pipeStates.get(e.getId()).getCode());
-                  }
-                })
-            .peek(
-                e -> {
-                  AppControl appControl =
-                      new AppControl()
-                          .setAllowBuild(
-                              e.getBuildStatus() == null
-                                  || 
!PipelineStatus.running.getCode().equals(e.getBuildStatus()))
-                          .setAllowStart(
-                              !e.shouldBeTrack()
-                                  && 
PipelineStatus.success.getCode().equals(e.getBuildStatus()))
-                          .setAllowStop(e.isRunning());
-                  e.setAppControl(appControl);
-                })
-            .collect(Collectors.toList());
-    applicationList.setRecords(appRecords);
     return RestResponse.success(applicationList);
   }
 
@@ -245,7 +204,6 @@ public class ApplicationController {
   @RequiresPermissions("app:start")
   public RestResponse start(@Parameter(hidden = true) Application app) {
     try {
-      applicationInfoService.checkEnv(app);
       applicationActionService.start(app, false);
       return RestResponse.success(true);
     } catch (Exception e) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 128893d81..067788db7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -61,6 +61,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import 
org.apache.streampark.console.core.service.application.ApplicationActionService;
+import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
@@ -130,6 +131,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
   @Autowired private ApplicationBackUpService backUpService;
   @Autowired private ApplicationManageService applicationManageService;
 
+  @Autowired private ApplicationInfoService applicationInfoService;
+
   @Autowired private ApplicationConfigService configService;
 
   @Autowired private ApplicationLogService applicationLogService;
@@ -379,7 +382,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
   @Transactional(rollbackFor = {Exception.class})
   public void start(Application appParam, boolean auto) throws Exception {
     final Application application = getById(appParam.getId());
-    Utils.notNull(application);
+    ApiAlertException.throwIfNull(application, "[StreamPark] application is 
not exists.");
+
     if (!application.isCanBeStart()) {
       throw new ApiAlertException("[StreamPark] The application cannot be 
started repeatedly.");
     }
@@ -389,6 +393,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       throw new ApiAlertException("[StreamPark] can no found flink version");
     }
 
+    applicationInfoService.checkEnv(appParam);
+
     // if manually started, clear the restart flag
     if (!auto) {
       application.setRestartCount(0);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 222b8967e..839dc2875 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -28,6 +28,7 @@ import 
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.CommonUtils;
 import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.bean.AppControl;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationConfig;
 import org.apache.streampark.console.core.entity.FlinkSql;
@@ -53,6 +54,7 @@ import 
org.apache.streampark.console.core.service.YarnQueueService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -76,11 +78,6 @@ import javax.annotation.PostConstruct;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
@@ -220,6 +217,10 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
     this.baseMapper.page(page, appParam);
     List<Application> records = page.getRecords();
     long now = System.currentTimeMillis();
+
+    List<Long> appIds = 
records.stream().map(Application::getId).collect(Collectors.toList());
+    Map<Long, PipelineStatus> pipeStates = 
appBuildPipeService.listPipelineStatus(appIds);
+
     List<Application> newRecords =
         records.stream()
             .peek(
@@ -236,6 +237,24 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
                       record.setDuration(now - 
record.getStartTime().getTime());
                     }
                   }
+                  if (pipeStates.containsKey(record.getId())) {
+                    
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
+                  }
+
+                  AppControl appControl =
+                      new AppControl()
+                          .setAllowBuild(
+                              record.getBuildStatus() == null
+                                  || !PipelineStatus.running
+                                      .getCode()
+                                      .equals(record.getBuildStatus()))
+                          .setAllowStart(
+                              !record.shouldBeTrack()
+                                  && PipelineStatus.success
+                                      .getCode()
+                                      .equals(record.getBuildStatus()))
+                          .setAllowStop(record.isRunning());
+                  record.setAppControl(appControl);
                 })
             .collect(Collectors.toList());
     page.setRecords(newRecords);

Reply via email to