This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 6a5e6e4e9 [Improve] Application service minor improvement (#3015)
6a5e6e4e9 is described below
commit 6a5e6e4e97dbf5f3fc745df7b36292acc2fd4888
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 2 21:16:40 2023 -0500
[Improve] Application service minor improvement (#3015)
* [Improve] application service minor improvement
* [Improve] applicationService minor improvement
* minor improvement
* minor improvement
* minor improvement
---
.../core/controller/ApplicationController.java | 47 ++-----------
.../application/ApplicationActionService.java | 27 +++-----
.../application/ApplicationInfoService.java | 36 +++++-----
.../application/ApplicationManageService.java | 40 +++++------
.../impl/ApplicationActionServiceImpl.java | 49 +++++++-------
.../impl/ApplicationInfoServiceImpl.java | 20 +++---
.../impl/ApplicationManageServiceImpl.java | 77 ++++++++++++++--------
.../service/ApplicationManageServiceITest.java | 3 +-
8 files changed, 138 insertions(+), 161 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index ed2a11352..048f95f2c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -26,20 +26,17 @@ import
org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.annotation.ApiAccess;
import org.apache.streampark.console.core.annotation.AppUpdated;
import org.apache.streampark.console.core.annotation.PermissionAction;
-import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationBackUp;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsState;
import org.apache.streampark.console.core.enums.PermissionType;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ResourceService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
-import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -63,10 +60,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
@Tag(name = "FLINK_APPLICATION_TAG")
@Slf4j
@@ -76,15 +70,15 @@ import java.util.stream.Collectors;
public class ApplicationController {
@Autowired private ApplicationManageService applicationManageService;
+
@Autowired private ApplicationActionService applicationActionService;
+
@Autowired private ApplicationInfoService applicationInfoService;
@Autowired private ApplicationBackUpService backUpService;
@Autowired private ApplicationLogService applicationLogService;
- @Autowired private AppBuildPipeService appBuildPipeService;
-
@Autowired private ResourceService resourceService;
@Operation(summary = "Get application")
@@ -128,12 +122,8 @@ public class ApplicationController {
@PostMapping(value = "copy")
@RequiresPermissions("app:copy")
public RestResponse copy(@Parameter(hidden = true) Application app) throws
IOException {
- Long id = applicationManageService.copy(app);
- Map<String, String> data = new HashMap<>();
- data.put("id", Long.toString(id));
- return id.equals(0L)
- ? RestResponse.success(false).data(data)
- : RestResponse.success(true).data(data);
+ applicationManageService.copy(app);
+ return RestResponse.success();
}
@Operation(summary = "Update application")
@@ -159,34 +149,6 @@ public class ApplicationController {
@RequiresPermissions("app:view")
public RestResponse list(Application app, RestRequest request) {
IPage<Application> applicationList = applicationManageService.page(app,
request);
- List<Application> appRecords = applicationList.getRecords();
- List<Long> appIds =
appRecords.stream().map(Application::getId).collect(Collectors.toList());
- Map<Long, PipelineStatus> pipeStates =
appBuildPipeService.listPipelineStatus(appIds);
-
- // add building pipeline status info and app control info
- appRecords =
- appRecords.stream()
- .peek(
- e -> {
- if (pipeStates.containsKey(e.getId())) {
- e.setBuildStatus(pipeStates.get(e.getId()).getCode());
- }
- })
- .peek(
- e -> {
- AppControl appControl =
- new AppControl()
- .setAllowBuild(
- e.getBuildStatus() == null
- ||
!PipelineStatus.running.getCode().equals(e.getBuildStatus()))
- .setAllowStart(
- !e.shouldBeTrack()
- &&
PipelineStatus.success.getCode().equals(e.getBuildStatus()))
- .setAllowStop(e.isRunning());
- e.setAppControl(appControl);
- })
- .collect(Collectors.toList());
- applicationList.setRecords(appRecords);
return RestResponse.success(applicationList);
}
@@ -245,7 +207,6 @@ public class ApplicationController {
@RequiresPermissions("app:start")
public RestResponse start(@Parameter(hidden = true) Application app) {
try {
- applicationInfoService.checkEnv(app);
applicationActionService.start(app, false);
return RestResponse.success(true);
} catch (Exception e) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
index 3e77485f8..e2936cb73 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
@@ -28,50 +28,43 @@ import com.baomidou.mybatisplus.extension.service.IService;
*/
public interface ApplicationActionService extends IService<Application> {
- /**
- * This method is used to start the given application.
- *
- * @param app The application object to be started.
- */
- void starting(Application app);
-
/**
* Starts the specified application.
*
- * @param app The application to start.
+ * @param appParam The application to start.
* @param auto True if the application should start automatically, False
otherwise.
* @throws Exception If an error occurs while starting the application.
*/
- void start(Application app, boolean auto) throws Exception;
+ void start(Application appParam, boolean auto) throws Exception;
/**
* Restarts the given application.
*
- * @param application The application to restart.
+ * @param appParam The application to restart.
* @throws Exception If an error occurs while restarting the application.
*/
- void restart(Application application) throws Exception;
+ void restart(Application appParam) throws Exception;
/**
* Revokes access for the given application.
*
- * @param app The application for which access needs to be revoked.
+ * @param appParam The application for which access needs to be revoked.
* @throws ApplicationException if an error occurs while revoking access.
*/
- void revoke(Application app) throws ApplicationException;
+ void revoke(Application appParam) throws ApplicationException;
/**
* Cancels the given application. Throws an exception if cancellation fails.
*
- * @param app the application to be canceled
+ * @param appParam the application to be canceled
* @throws Exception if cancellation fails
*/
- void cancel(Application app) throws Exception;
+ void cancel(Application appParam) throws Exception;
/**
* Forces the given application to stop.
*
- * @param app the application to be stopped
+ * @param appParam the application to be stopped
*/
- void forcedStop(Application app);
+ void forcedStop(Application appParam);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
index 093b1b0d4..57fbdef59 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
@@ -37,37 +37,37 @@ public interface ApplicationInfoService extends
IService<Application> {
/**
* Maps the given application.
*
- * @param app The application to be mapped.
+ * @param appParam The application to be mapped.
* @return True if the mapping was successful, false otherwise.
*/
- boolean mapping(Application app);
+ boolean mapping(Application appParam);
/**
* Checks the environment for the given application.
*
- * @param app the application to check the environment for
+ * @param appParam the application to check the environment for
* @return true if the environment is valid for the application, false
otherwise
* @throws ApplicationException if an error occurs while checking the
environment
*/
- boolean checkEnv(Application app) throws ApplicationException;
+ boolean checkEnv(Application appParam) throws ApplicationException;
/**
* Checks the savepoint path for the given application.
*
- * @param app the application to check the savepoint path for
+ * @param appParam the application to check the savepoint path for
* @return the check message
* @throws Exception if an error occurs while checking the savepoint path
*/
- String checkSavepointPath(Application app) throws Exception;
+ String checkSavepointPath(Application appParam) throws Exception;
/**
* Checks if the given application meets the required alterations.
*
- * @param application The application to be checked.
+ * @param appParam The application to be checked.
* @return True if the application meets the required alterations, false
otherwise.
* @throws ApplicationException If an error occurs while checking the
alterations.
*/
- boolean checkAlter(Application application);
+ boolean checkAlter(Application appParam);
/**
* Checks if a record exists in the database with the given team ID.
@@ -129,42 +129,42 @@ public interface ApplicationInfoService extends
IService<Application> {
/**
* Gets the YARN name for the given application.
*
- * @param app The application for which to retrieve the YARN name.
+ * @param appParam The application for which to retrieve the YARN name.
* @return The YARN name of the application as a String.
*/
- String getYarnName(Application app);
+ String getYarnName(Application appParam);
/**
* Checks if the given application exists in the system.
*
- * @param app The application to check for existence.
+ * @param appParam The application to check for existence.
* @return AppExistsState indicating the existence state of the application.
*/
- AppExistsState checkExists(Application app);
+ AppExistsState checkExists(Application appParam);
/**
* Persists the metrics of the given application.
*
- * @param application The application which metrics need to be persisted.
+ * @param appParam The application which metrics need to be persisted.
*/
- void persistMetrics(Application application);
+ void persistMetrics(Application appParam);
/**
* Reads the configuration for the given application and returns it as a
String.
*
- * @param app The application for which the configuration needs to be read.
+ * @param appParam The application for which the configuration needs to be
read.
* @return The configuration for the given application as a String.
* @throws IOException If an I/O error occurs while reading the
configuration.
*/
- String readConf(Application app) throws IOException;
+ String readConf(Application appParam) throws IOException;
/**
* Retrieves the main configuration value for the given Application.
*
- * @param application the Application object for which to fetch the main
configuration value
+ * @param appParam the Application object for which to fetch the main
configuration value
* @return the main configuration value as a String
*/
- String getMain(Application application);
+ String getMain(Application appParam);
/**
* Returns the dashboard for the specified team.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
index 3d2c6f4fa..d55564111 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
@@ -37,83 +37,83 @@ public interface ApplicationManageService extends
IService<Application> {
/**
* Retrieves a page of applications based on the provided parameters.
*
- * @param app The application object to be used for filtering the results.
+ * @param appParam The application object to be used for filtering the
results.
* @param request The REST request object containing additional parameters
or headers.
* @return A page of Application objects based on the provided parameters.
*/
- IPage<Application> page(Application app, RestRequest request);
+ IPage<Application> page(Application appParam, RestRequest request);
/**
* Creates a new application.
*
- * @param app The application to create.
+ * @param appParam The application to create.
* @return True if the application was successfully created, false otherwise.
* @throws IOException If an I/O error occurs.
*/
- boolean create(Application app) throws IOException;
+ boolean create(Application appParam) throws IOException;
/**
* Copies the given Application.
*
- * @param app the Application to be copied
+ * @param appParam the Application to be copied
* @return the size of the copied Application in bytes as a Long value
* @throws IOException if there was an error during the copy process
*/
- Long copy(Application app) throws IOException;
+ Long copy(Application appParam) throws IOException;
/**
* Updates the given application.
*
- * @param app the application to be updated
+ * @param appParam the application to be updated
* @return true if the update was successful, false otherwise
*/
- boolean update(Application app);
+ boolean update(Application appParam);
/**
* Sets the given application to be effective.
*
- * @param application the application to be set effective
+ * @param appParam the application to be set effective
*/
- void toEffective(Application application);
+ void toEffective(Application appParam);
/**
* Checks if the given application is ready to build and update.
*
- * @param app the application to check for readiness
+ * @param appParam the application to check for readiness
* @return true if the application is ready to build and update, false
otherwise
*/
- boolean checkBuildAndUpdate(Application app);
+ boolean checkBuildAndUpdate(Application appParam);
/**
* Deletes the given Application from the system.
*
- * @param app The Application to be deleted.
+ * @param appParam The Application to be deleted.
* @return True if the deletion was successful, false otherwise.
*/
- Boolean delete(Application app);
+ Boolean delete(Application appParam);
/**
* Retrieves the Application with the specified details from the system.
*
- * @param app The Application object containing the details of the
Application to retrieve.
+ * @param appParam The Application object containing the details of the
Application to retrieve.
* @return The Application object that matches the specified details, or
null if no matching
* Application is found.
*/
- Application getApp(Application app);
+ Application getApp(Application appParam);
/**
* Updates the release of the given application.
*
- * @param application The application to update the release for.
+ * @param appParam The application to update the release for.
*/
- void updateRelease(Application application);
+ void updateRelease(Application appParam);
/**
* Cleans the application by performing necessary cleanup tasks.
*
- * @param app The application to clean.
+ * @param appParam The application to clean.
*/
- void clean(Application app);
+ void clean(Application appParam);
/**
* Retrieves a list of applications by project ID.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 128893d81..9d6926a9a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -61,6 +61,7 @@ import
org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import
org.apache.streampark.console.core.service.application.ApplicationActionService;
+import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
@@ -130,6 +131,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Autowired private ApplicationBackUpService backUpService;
@Autowired private ApplicationManageService applicationManageService;
+ @Autowired private ApplicationInfoService applicationInfoService;
+
@Autowired private ApplicationConfigService configService;
@Autowired private ApplicationLogService applicationLogService;
@@ -161,11 +164,11 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
new ConcurrentHashMap<>();
@Override
- public void revoke(Application appParma) throws ApplicationException {
- Application application = getById(appParma.getId());
+ public void revoke(Application appParam) throws ApplicationException {
+ Application application = getById(appParam.getId());
ApiAlertException.throwIfNull(
application,
- String.format("The application id=%s not found, revoke failed.",
appParma.getId()));
+ String.format("The application id=%s not found, revoke failed.",
appParam.getId()));
// 1) delete files that have been published to workspace
application.getFsOperator().delete(application.getAppHome());
@@ -188,16 +191,16 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
}
@Override
- public void restart(Application application) throws Exception {
- this.cancel(application);
- this.start(application, false);
+ public void restart(Application appParam) throws Exception {
+ this.cancel(appParam);
+ this.start(appParam, false);
}
@Override
- public void forcedStop(Application app) {
- CompletableFuture<SubmitResponse> startFuture =
startFutureMap.remove(app.getId());
- CompletableFuture<CancelResponse> cancelFuture =
cancelFutureMap.remove(app.getId());
- Application application = this.baseMapper.getApp(app);
+ public void forcedStop(Application appParam) {
+ CompletableFuture<SubmitResponse> startFuture =
startFutureMap.remove(appParam.getId());
+ CompletableFuture<CancelResponse> cancelFuture =
cancelFutureMap.remove(appParam.getId());
+ Application application = this.baseMapper.getApp(appParam);
if (isKubernetesApp(application)) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(),
application.getJobId());
@@ -213,7 +216,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
cancelFuture.cancel(true);
}
if (startFuture == null && cancelFuture == null) {
- this.updateToStopped(app);
+ this.updateToStopped(appParam);
}
}
@@ -363,23 +366,12 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
});
}
- /**
- * Setup task is starting (for webUI "state" display)
- *
- * @param application
- */
- @Override
- public void starting(Application application) {
- application.setState(FlinkAppState.STARTING.getValue());
- application.setOptionTime(new Date());
- updateById(application);
- }
-
@Override
@Transactional(rollbackFor = {Exception.class})
public void start(Application appParam, boolean auto) throws Exception {
final Application application = getById(appParam.getId());
- Utils.notNull(application);
+ ApiAlertException.throwIfNull(application, "[StreamPark] application is
not exists.");
+
if (!application.isCanBeStart()) {
throw new ApiAlertException("[StreamPark] The application cannot be
started repeatedly.");
}
@@ -389,6 +381,13 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
throw new ApiAlertException("[StreamPark] can no found flink version");
}
+ applicationInfoService.checkEnv(appParam);
+
+ // update state to starting
+ application.setState(FlinkAppState.STARTING.getValue());
+ application.setOptionTime(new Date());
+ updateById(application);
+
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
@@ -399,8 +398,6 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
appParam.setSavePointed(true);
application.setRestartCount(application.getRestartCount() + 1);
}
-
- starting(application);
application.setAllowNonRestored(appParam.getAllowNonRestored());
String appConf;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 8ded940f7..b26de031f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -209,14 +209,14 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
}
@Override
- public boolean checkAlter(Application application) {
- Long appId = application.getId();
- FlinkAppState state = FlinkAppState.of(application.getState());
+ public boolean checkAlter(Application appParam) {
+ Long appId = appParam.getId();
+ FlinkAppState state = FlinkAppState.of(appParam.getState());
if (!FlinkAppState.CANCELED.equals(state)) {
return false;
}
long cancelUserId = FlinkHttpWatcher.getCanceledJobUserId(appId);
- long appUserId = application.getUserId();
+ long appUserId = appParam.getUserId();
return cancelUserId != -1 && cancelUserId != appUserId;
}
@@ -432,16 +432,16 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
}
@Override
- public String getMain(Application application) {
+ public String getMain(Application appParam) {
File jarFile;
- if (application.getProjectId() == null) {
- jarFile = new File(application.getJar());
+ if (appParam.getProjectId() == null) {
+ jarFile = new File(appParam.getJar());
} else {
Project project = new Project();
- project.setId(application.getProjectId());
+ project.setId(appParam.getProjectId());
String modulePath =
-
project.getDistHome().getAbsolutePath().concat("/").concat(application.getModule());
- jarFile = new File(modulePath, application.getJar());
+
project.getDistHome().getAbsolutePath().concat("/").concat(appParam.getModule());
+ jarFile = new File(modulePath, appParam.getJar());
}
Manifest manifest = Utils.getJarManifest(jarFile);
return manifest.getMainAttributes().getValue("Main-Class");
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 222b8967e..eeabbaad8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -28,6 +28,7 @@ import
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.base.util.WebUtils;
+import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.FlinkSql;
@@ -53,6 +54,7 @@ import
org.apache.streampark.console.core.service.YarnQueueService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.commons.lang3.StringUtils;
@@ -80,6 +82,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -127,16 +130,16 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
@Override
- public void toEffective(Application application) {
+ public void toEffective(Application appParam) {
// set latest to Effective
- ApplicationConfig config = configService.getLatest(application.getId());
+ ApplicationConfig config = configService.getLatest(appParam.getId());
if (config != null) {
- this.configService.toEffective(application.getId(), config.getId());
+ this.configService.toEffective(appParam.getId(), config.getId());
}
- if (application.isFlinkSqlJob()) {
- FlinkSql flinkSql = flinkSqlService.getCandidate(application.getId(),
null);
+ if (appParam.isFlinkSqlJob()) {
+ FlinkSql flinkSql = flinkSqlService.getCandidate(appParam.getId(), null);
if (flinkSql != null) {
- flinkSqlService.toEffective(application.getId(), flinkSql.getId());
+ flinkSqlService.toEffective(appParam.getId(), flinkSql.getId());
// clean candidate
flinkSqlService.cleanCandidate(flinkSql.getId());
}
@@ -145,9 +148,9 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
@Override
@Transactional(rollbackFor = {Exception.class})
- public Boolean delete(Application paramApp) {
+ public Boolean delete(Application appParam) {
- Application application = getById(paramApp.getId());
+ Application application = getById(appParam.getId());
// 1) remove flink sql
flinkSqlService.removeApp(application.getId());
@@ -177,7 +180,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
} else {
- FlinkHttpWatcher.unWatching(paramApp.getId());
+ FlinkHttpWatcher.unWatching(appParam.getId());
}
return true;
}
@@ -220,6 +223,10 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
this.baseMapper.page(page, appParam);
List<Application> records = page.getRecords();
long now = System.currentTimeMillis();
+
+ List<Long> appIds =
records.stream().map(Application::getId).collect(Collectors.toList());
+ Map<Long, PipelineStatus> pipeStates =
appBuildPipeService.listPipelineStatus(appIds);
+
List<Application> newRecords =
records.stream()
.peek(
@@ -236,6 +243,24 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
record.setDuration(now -
record.getStartTime().getTime());
}
}
+ if (pipeStates.containsKey(record.getId())) {
+
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
+ }
+
+ AppControl appControl =
+ new AppControl()
+ .setAllowBuild(
+ record.getBuildStatus() == null
+ || !PipelineStatus.running
+ .getCode()
+ .equals(record.getBuildStatus()))
+ .setAllowStart(
+ !record.shouldBeTrack()
+ && PipelineStatus.success
+ .getCode()
+ .equals(record.getBuildStatus()))
+ .setAllowStop(record.isRunning());
+ record.setAppControl(appControl);
})
.collect(Collectors.toList());
page.setRecords(newRecords);
@@ -587,13 +612,13 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
@Override
- public void updateRelease(Application application) {
+ public void updateRelease(Application appParam) {
LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
- updateWrapper.eq(Application::getId, application.getId());
- updateWrapper.set(Application::getRelease, application.getRelease());
- updateWrapper.set(Application::getBuild, application.getBuild());
- if (application.getOptionState() != null) {
- updateWrapper.set(Application::getOptionState,
application.getOptionState());
+ updateWrapper.eq(Application::getId, appParam.getId());
+ updateWrapper.set(Application::getRelease, appParam.getRelease());
+ updateWrapper.set(Application::getBuild, appParam.getBuild());
+ if (appParam.getOptionState() != null) {
+ updateWrapper.set(Application::getOptionState,
appParam.getOptionState());
}
this.update(updateWrapper);
}
@@ -623,12 +648,12 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
@Override
- public boolean checkBuildAndUpdate(Application application) {
- boolean build = application.getBuild();
+ public boolean checkBuildAndUpdate(Application appParam) {
+ boolean build = appParam.getBuild();
if (!build) {
LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
- updateWrapper.eq(Application::getId, application.getId());
- if (application.isRunning()) {
+ updateWrapper.eq(Application::getId, appParam.getId());
+ if (appParam.isRunning()) {
updateWrapper.set(Application::getRelease,
ReleaseState.NEED_RESTART.get());
} else {
updateWrapper.set(Application::getRelease, ReleaseState.DONE.get());
@@ -637,18 +662,18 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
this.update(updateWrapper);
// backup
- if (application.isFlinkSqlJob()) {
- FlinkSql newFlinkSql =
flinkSqlService.getCandidate(application.getId(), CandidateType.NEW);
- if (!application.isNeedRollback() && newFlinkSql != null) {
- backUpService.backup(application, newFlinkSql);
+ if (appParam.isFlinkSqlJob()) {
+ FlinkSql newFlinkSql = flinkSqlService.getCandidate(appParam.getId(),
CandidateType.NEW);
+ if (!appParam.isNeedRollback() && newFlinkSql != null) {
+ backUpService.backup(appParam, newFlinkSql);
}
}
// If the current task is not running, or the task has just been added,
// directly set the candidate version to the official version
- FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(),
false);
- if (!application.isRunning() || flinkSql == null) {
- this.toEffective(application);
+ FlinkSql flinkSql = flinkSqlService.getEffective(appParam.getId(),
false);
+ if (!appParam.isRunning() || flinkSql == null) {
+ this.toEffective(appParam);
}
}
return build;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
index d32cd009d..759fc9dac 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
@@ -49,7 +49,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Integration test for {@link
- * org.apache.streampark.console.core.service.impl.ApplicationServiceImpl}.
+ *
org.apache.streampark.console.core.service.application.ApplicationManageService}.
*/
class ApplicationManageServiceITest extends SpringIntegrationTestBase {
@@ -57,6 +57,7 @@ class ApplicationManageServiceITest extends
SpringIntegrationTestBase {
FlinkStandaloneSessionCluster.builder().slotsNumPerTm(4).slf4jLogConsumer(null).build();
@Autowired private ApplicationManageService applicationManageService;
+
@Autowired private ApplicationActionService applicationActionService;
@Autowired private FlinkClusterService clusterService;