This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch app in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 1360457ef85b778e54e157dd32b29fadc4f26c97 Author: benjobs <[email protected]> AuthorDate: Sat Sep 2 23:10:49 2023 +0800 [Improve] application service minor improvement --- .../core/controller/ApplicationController.java | 46 +--------------------- .../impl/ApplicationActionServiceImpl.java | 8 +++- .../impl/ApplicationManageServiceImpl.java | 29 +++++++++++--- 3 files changed, 33 insertions(+), 50 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java index ed2a11352..07fd8e340 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java @@ -26,7 +26,6 @@ import org.apache.streampark.console.base.exception.InternalException; import org.apache.streampark.console.core.annotation.ApiAccess; import org.apache.streampark.console.core.annotation.AppUpdated; import org.apache.streampark.console.core.annotation.PermissionAction; -import org.apache.streampark.console.core.bean.AppControl; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ApplicationBackUp; import org.apache.streampark.console.core.entity.ApplicationLog; @@ -39,7 +38,6 @@ import org.apache.streampark.console.core.service.ResourceService; import org.apache.streampark.console.core.service.application.ApplicationActionService; import org.apache.streampark.console.core.service.application.ApplicationInfoService; import org.apache.streampark.console.core.service.application.ApplicationManageService; -import org.apache.streampark.flink.packer.pipeline.PipelineStatus; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -63,10 +61,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.URI; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; @Tag(name = "FLINK_APPLICATION_TAG") @Slf4j @@ -78,13 +73,9 @@ public class ApplicationController { @Autowired private ApplicationManageService applicationManageService; @Autowired private ApplicationActionService applicationActionService; @Autowired private ApplicationInfoService applicationInfoService; - @Autowired private ApplicationBackUpService backUpService; - @Autowired private ApplicationLogService applicationLogService; - @Autowired private AppBuildPipeService appBuildPipeService; - @Autowired private ResourceService resourceService; @Operation(summary = "Get application") @@ -128,12 +119,8 @@ public class ApplicationController { @PostMapping(value = "copy") @RequiresPermissions("app:copy") public RestResponse copy(@Parameter(hidden = true) Application app) throws IOException { - Long id = applicationManageService.copy(app); - Map<String, String> data = new HashMap<>(); - data.put("id", Long.toString(id)); - return id.equals(0L) - ? RestResponse.success(false).data(data) - : RestResponse.success(true).data(data); + applicationManageService.copy(app); + return RestResponse.success(); } @Operation(summary = "Update application") @@ -159,34 +146,6 @@ public class ApplicationController { @RequiresPermissions("app:view") public RestResponse list(Application app, RestRequest request) { IPage<Application> applicationList = applicationManageService.page(app, request); - List<Application> appRecords = applicationList.getRecords(); - List<Long> appIds = appRecords.stream().map(Application::getId).collect(Collectors.toList()); - Map<Long, PipelineStatus> pipeStates = appBuildPipeService.listPipelineStatus(appIds); - - // add building pipeline status info and app control info - appRecords = - appRecords.stream() - .peek( - e -> { - if (pipeStates.containsKey(e.getId())) { - e.setBuildStatus(pipeStates.get(e.getId()).getCode()); - } - }) - .peek( - e -> { - AppControl appControl = - new AppControl() - .setAllowBuild( - e.getBuildStatus() == null - || !PipelineStatus.running.getCode().equals(e.getBuildStatus())) - .setAllowStart( - !e.shouldBeTrack() - && PipelineStatus.success.getCode().equals(e.getBuildStatus())) - .setAllowStop(e.isRunning()); - e.setAppControl(appControl); - }) - .collect(Collectors.toList()); - applicationList.setRecords(appRecords); return RestResponse.success(applicationList); } @@ -245,7 +204,6 @@ public class ApplicationController { @RequiresPermissions("app:start") public RestResponse start(@Parameter(hidden = true) Application app) { try { - applicationInfoService.checkEnv(app); applicationActionService.start(app, false); return RestResponse.success(true); } catch (Exception e) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index 128893d81..067788db7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -61,6 +61,7 @@ import org.apache.streampark.console.core.service.SavePointService; import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.VariableService; import org.apache.streampark.console.core.service.application.ApplicationActionService; +import org.apache.streampark.console.core.service.application.ApplicationInfoService; import org.apache.streampark.console.core.service.application.ApplicationManageService; import org.apache.streampark.console.core.task.FlinkHttpWatcher; import org.apache.streampark.flink.client.FlinkClient; @@ -130,6 +131,8 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, @Autowired private ApplicationBackUpService backUpService; @Autowired private ApplicationManageService applicationManageService; + @Autowired private ApplicationInfoService applicationInfoService; + @Autowired private ApplicationConfigService configService; @Autowired private ApplicationLogService applicationLogService; @@ -379,7 +382,8 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, @Transactional(rollbackFor = {Exception.class}) public void start(Application appParam, boolean auto) throws Exception { final Application application = getById(appParam.getId()); - Utils.notNull(application); + ApiAlertException.throwIfNull(application, "[StreamPark] application is not exists."); + if (!application.isCanBeStart()) { throw new ApiAlertException("[StreamPark] The application cannot be started repeatedly."); } @@ -389,6 +393,8 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, throw new ApiAlertException("[StreamPark] can no found flink version"); } + applicationInfoService.checkEnv(appParam); + // if manually started, clear the restart flag if (!auto) { application.setRestartCount(0); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java index 222b8967e..839dc2875 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java @@ -28,6 +28,7 @@ import org.apache.streampark.console.base.mybatis.pager.MybatisPager; import org.apache.streampark.console.base.util.CommonUtils; import org.apache.streampark.console.base.util.ObjectUtils; import org.apache.streampark.console.base.util.WebUtils; +import org.apache.streampark.console.core.bean.AppControl; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ApplicationConfig; import org.apache.streampark.console.core.entity.FlinkSql; @@ -53,6 +54,7 @@ import org.apache.streampark.console.core.service.YarnQueueService; import org.apache.streampark.console.core.service.application.ApplicationManageService; import org.apache.streampark.console.core.task.FlinkHttpWatcher; import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; +import org.apache.streampark.flink.packer.pipeline.PipelineStatus; import org.apache.commons.lang3.StringUtils; @@ -76,11 +78,6 @@ import javax.annotation.PostConstruct; import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; import static org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId; @@ -220,6 +217,10 @@ public class ApplicationManageServiceImpl extends ServiceImpl<ApplicationMapper, this.baseMapper.page(page, appParam); List<Application> records = page.getRecords(); long now = System.currentTimeMillis(); + + List<Long> appIds = records.stream().map(Application::getId).collect(Collectors.toList()); + Map<Long, PipelineStatus> pipeStates = appBuildPipeService.listPipelineStatus(appIds); + List<Application> newRecords = records.stream() .peek( @@ -236,6 +237,24 @@ public class ApplicationManageServiceImpl extends ServiceImpl<ApplicationMapper, record.setDuration(now - record.getStartTime().getTime()); } } + if (pipeStates.containsKey(record.getId())) { + record.setBuildStatus(pipeStates.get(record.getId()).getCode()); + } + + AppControl appControl = + new AppControl() + .setAllowBuild( + record.getBuildStatus() == null + || !PipelineStatus.running + .getCode() + .equals(record.getBuildStatus())) + .setAllowStart( + !record.shouldBeTrack() + && PipelineStatus.success + .getCode() + .equals(record.getBuildStatus())) + .setAllowStop(record.isRunning()); + record.setAppControl(appControl); }) .collect(Collectors.toList()); page.setRecords(newRecords);
