This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new c89ceed3d [ISSUE-3472][Improve] Improve streampark-console module
Controller invoke service (#3488)
c89ceed3d is described below
commit c89ceed3d9987132486a714264fc4da391655c5d
Author: zhengke zhou <[email protected]>
AuthorDate: Mon Jan 22 13:48:13 2024 +0800
[ISSUE-3472][Improve] Improve streampark-console module Controller invoke
service (#3488)
[ISSUE-3472][Improve] Improve streampark-console module Controller invoke
service
---
.../console/core/controller/AlertController.java | 2 +-
.../core/controller/ApplicationController.java | 14 +++++-----
.../console/core/controller/ConfigController.java | 2 +-
.../core/controller/FlinkClusterController.java | 2 +-
.../core/controller/FlinkSqlController.java | 2 +-
.../core/controller/ResourceController.java | 2 +-
.../console/core/mapper/ApplicationMapper.java | 2 +-
.../core/service/ApplicationConfigService.java | 4 +--
.../console/core/service/FlinkClusterService.java | 2 +-
.../console/core/service/FlinkSqlService.java | 2 +-
.../console/core/service/ResourceService.java | 4 +--
.../core/service/alert/AlertConfigService.java | 2 +-
.../service/alert/impl/AlertConfigServiceImpl.java | 4 +--
.../application/ApplicationActionService.java | 8 +++---
.../application/ApplicationInfoService.java | 12 ++++-----
.../application/ApplicationManageService.java | 8 +++---
.../impl/ApplicationActionServiceImpl.java | 31 +++++++++++-----------
.../impl/ApplicationInfoServiceImpl.java | 12 ++++-----
.../impl/ApplicationManageServiceImpl.java | 14 +++++-----
.../service/impl/ApplicationConfigServiceImpl.java | 6 ++---
.../core/service/impl/FlinkClusterServiceImpl.java | 3 +--
.../core/service/impl/FlinkSqlServiceImpl.java | 6 ++---
.../core/service/impl/ResourceServiceImpl.java | 6 ++---
.../system/controller/MemberController.java | 2 +-
.../console/system/service/MemberService.java | 2 +-
.../system/service/impl/MemberServiceImpl.java | 8 +++---
.../resources/mapper/core/ApplicationMapper.xml | 4 +--
.../service/ApplicationManageServiceITest.java | 2 +-
28 files changed, 82 insertions(+), 86 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
index 381206f13..234e5259f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
@@ -92,7 +92,7 @@ public class AlertController {
@PostMapping(value = "/list")
public RestResponse alertConfigsPaginationList(
@RequestBody AlertConfigParams params, RestRequest request) {
- IPage<AlertConfigParams> page = alertConfigService.page(params, request);
+ IPage<AlertConfigParams> page =
alertConfigService.page(params.getUserId(), request);
return RestResponse.success(page);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 412eabef4..77736db00 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -86,7 +86,7 @@ public class ApplicationController {
@PostMapping("get")
@RequiresPermissions("app:detail")
public RestResponse get(Application app) {
- Application application = applicationManageService.getApp(app);
+ Application application = applicationManageService.getApp(app.getId());
return RestResponse.success(application);
}
@@ -167,7 +167,7 @@ public class ApplicationController {
@PostMapping("revoke")
@RequiresPermissions("app:release")
public RestResponse revoke(Application app) {
- applicationActionService.revoke(app);
+ applicationActionService.revoke(app.getId());
return RestResponse.success();
}
@@ -175,7 +175,7 @@ public class ApplicationController {
@PostMapping(value = "check_start")
@RequiresPermissions("app:start")
public RestResponse checkStart(Application app) {
- AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app);
+ AppExistsStateEnum stateEnum =
applicationInfoService.checkStart(app.getId());
return RestResponse.success(stateEnum.get());
}
@@ -286,7 +286,7 @@ public class ApplicationController {
@PostMapping("forcedStop")
@RequiresPermissions("app:cancel")
public RestResponse forcedStop(Application app) {
- applicationActionService.forcedStop(app);
+ applicationActionService.forcedStop(app.getId());
return RestResponse.success();
}
@@ -299,7 +299,7 @@ public class ApplicationController {
@Operation(summary = "Get application on yarn name")
@PostMapping("name")
public RestResponse yarnName(Application app) {
- String yarnName = applicationInfoService.getYarnName(app);
+ String yarnName = applicationInfoService.getYarnName(app.getConfig());
return RestResponse.success(yarnName);
}
@@ -313,7 +313,7 @@ public class ApplicationController {
@Operation(summary = "Get application conf")
@PostMapping("readConf")
public RestResponse readConf(Application app) throws IOException {
- String config = applicationInfoService.readConf(app);
+ String config = applicationInfoService.readConf(app.getConfig());
return RestResponse.success(config);
}
@@ -352,7 +352,7 @@ public class ApplicationController {
@PostMapping("delete")
@RequiresPermissions("app:delete")
public RestResponse delete(Application app) throws InternalException {
- Boolean deleted = applicationManageService.remove(app);
+ Boolean deleted = applicationManageService.remove(app.getId());
return RestResponse.success(deleted);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
index 376cf9ea6..06e1c167c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
@@ -73,7 +73,7 @@ public class ConfigController {
@Operation(summary = "List application config histories")
@PostMapping("history")
public RestResponse history(Application application) {
- List<ApplicationConfig> history =
applicationConfigService.list(application);
+ List<ApplicationConfig> history =
applicationConfigService.list(application.getId());
return RestResponse.success(history);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 2af28b1fb..27b0b80aa 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -118,7 +118,7 @@ public class FlinkClusterController {
@Operation(summary = "Delete flink cluster")
@PostMapping("delete")
public RestResponse delete(FlinkCluster cluster) {
- flinkClusterService.remove(cluster);
+ flinkClusterService.remove(cluster.getId());
return RestResponse.success();
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
index c407325ae..e3dba3689 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
@@ -118,7 +118,7 @@ public class FlinkSqlController {
@Operation(summary = "List the applications sql histories")
@PostMapping("history")
public RestResponse sqlhistory(Application application) {
- List<FlinkSql> sqlList = flinkSqlService.listFlinkSqlHistory(application);
+ List<FlinkSql> sqlList =
flinkSqlService.listFlinkSqlHistory(application.getId());
return RestResponse.success(sqlList);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
index 9e2b53675..c2761e0bc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java
@@ -84,7 +84,7 @@ public class ResourceController {
@DeleteMapping("delete")
@RequiresPermissions("resource:delete")
public RestResponse deleteResource(@Valid Resource resource) {
- this.resourceService.remove(resource);
+ this.resourceService.remove(resource.getId());
return RestResponse.success();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 095b240d1..970ddc734 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -31,7 +31,7 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
IPage<Application> selectPage(Page<Application> page, @Param("app")
Application application);
- Application selectApp(@Param("app") Application application);
+ Application selectApp(@Param("id") Long id);
void persistMetrics(@Param("app") Application application);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
index 1c8485210..325d35e51 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java
@@ -105,10 +105,10 @@ public interface ApplicationConfigService extends
IService<ApplicationConfig> {
/**
* Retrieves the history of application configurations for a given
application.
*
- * @param appParam The application for which to retrieve the history.
+ * @param appId The application's id for which to retrieve the history.
* @return The list of application configurations representing the history.
*/
- List<ApplicationConfig> list(Application appParam);
+ List<ApplicationConfig> list(Long appId);
/**
* Reads a template from a file or a database.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index ff043e825..cdf3b2a88 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -35,7 +35,7 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
Boolean create(FlinkCluster flinkCluster);
- void remove(FlinkCluster flinkCluster);
+ void remove(Long id);
void update(FlinkCluster flinkCluster);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java
index 050d94903..39659da2c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkSqlService.java
@@ -38,7 +38,7 @@ public interface FlinkSqlService extends IService<FlinkSql> {
FlinkSql getLatestFlinkSql(Long appId, boolean decode);
- List<FlinkSql> listFlinkSqlHistory(Application application);
+ List<FlinkSql> listFlinkSqlHistory(Long appId);
FlinkSql getCandidate(Long appId, CandidateTypeEnum type);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
index 22f33c243..5be7a2292 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java
@@ -71,9 +71,9 @@ public interface ResourceService extends IService<Resource> {
/**
* delete resource
*
- * @param resource
+ * @param id
*/
- void remove(Resource resource);
+ void remove(Long id);
/**
* Get resource through team id.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertConfigService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertConfigService.java
index 6da414fb7..7c0dad562 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertConfigService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/AlertConfigService.java
@@ -26,7 +26,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
public interface AlertConfigService extends IService<AlertConfig> {
- IPage<AlertConfigParams> page(AlertConfigParams params, RestRequest request);
+ IPage<AlertConfigParams> page(Long userId, RestRequest request);
boolean exist(AlertConfig alertConfig);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
index ed8d17f2d..6d889a7ba 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertConfigServiceImpl.java
@@ -50,10 +50,10 @@ public class AlertConfigServiceImpl extends
ServiceImpl<AlertConfigMapper, Alert
@Autowired private ApplicationInfoService applicationInfoService;
@Override
- public IPage<AlertConfigParams> page(AlertConfigParams params, RestRequest
request) {
+ public IPage<AlertConfigParams> page(Long userId, RestRequest request) {
// build query conditions
LambdaQueryWrapper<AlertConfig> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(params.getUserId() != null, AlertConfig::getUserId,
params.getUserId());
+ wrapper.eq(userId != null, AlertConfig::getUserId, userId);
Page<AlertConfig> page = MybatisPager.getPage(request);
IPage<AlertConfig> resultPage = getBaseMapper().selectPage(page, wrapper);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
index e2936cb73..2ab14f91e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationActionService.java
@@ -48,10 +48,10 @@ public interface ApplicationActionService extends
IService<Application> {
/**
* Revokes access for the given application.
*
- * @param appParam The application for which access needs to be revoked.
+ * @param appId The application's id for which access needs to be revoked.
* @throws ApplicationException if an error occurs while revoking access.
*/
- void revoke(Application appParam) throws ApplicationException;
+ void revoke(Long appId) throws ApplicationException;
/**
* Cancels the given application. Throws an exception if cancellation fails.
@@ -64,7 +64,7 @@ public interface ApplicationActionService extends
IService<Application> {
/**
* Forces the given application to stop.
*
- * @param appParam the application to be stopped
+ * @param id the application's id which need to be stopped
*/
- void forcedStop(Application appParam);
+ void forcedStop(Long id);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
index 8d1e0b112..c40cc6a2e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
@@ -123,10 +123,10 @@ public interface ApplicationInfoService extends
IService<Application> {
/**
* Gets the YARN name for the given application.
*
- * @param appParam The application for which to retrieve the YARN name.
+ * @param appConfig The application's config for which to retrieve the YARN
name.
* @return The YARN name of the application as a String.
*/
- String getYarnName(Application appParam);
+ String getYarnName(String appConfig);
/**
* Checks if the given application exists in the system.
@@ -139,11 +139,11 @@ public interface ApplicationInfoService extends
IService<Application> {
/**
* Reads the configuration for the given application and returns it as a
String.
*
- * @param appParam The application for which the configuration needs to be
read.
+ * @param appConfig The application's config for which the configuration
needs to be read.
* @return The configuration for the given application as a String.
* @throws IOException If an I/O error occurs while reading the
configuration.
*/
- String readConf(Application appParam) throws IOException;
+ String readConf(String appConfig) throws IOException;
/**
* Retrieves the main configuration value for the given Application.
@@ -226,10 +226,10 @@ public interface ApplicationInfoService extends
IService<Application> {
/**
* check application before start
*
- * @param appParam
+ * @param id the application's id which need to check before start.
* @return org.apache.streampark.console.core.enums.AppExistsStateEnum
*/
- AppExistsStateEnum checkStart(Application appParam);
+ AppExistsStateEnum checkStart(Long id);
/**
* @param appName
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
index 4c111cc76..a5263fee5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationManageService.java
@@ -102,19 +102,19 @@ public interface ApplicationManageService extends
IService<Application> {
/**
* Deletes the given Application from the system.
*
- * @param appParam The Application to be deleted.
+ * @param appId The Application's id which need to be deleted.
* @return True if the deletion was successful, false otherwise.
*/
- Boolean remove(Application appParam);
+ Boolean remove(Long appId);
/**
* Retrieves the Application with the specified details from the system.
*
- * @param appParam The Application object containing the details of the
Application to retrieve.
+ * @param id The Application object's id.
* @return The Application object that matches the specified details, or
null if no matching
* Application is found.
*/
- Application getApp(Application appParam);
+ Application getApp(Long id);
/**
* Updates the release of the given application.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 54fc3ec67..70f02315c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -173,11 +173,10 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
new ConcurrentHashMap<>();
@Override
- public void revoke(Application appParam) throws ApplicationException {
- Application application = getById(appParam.getId());
+ public void revoke(Long appId) throws ApplicationException {
+ Application application = getById(appId);
ApiAlertException.throwIfNull(
- application,
- String.format("The application id=%s not found, revoke failed.",
appParam.getId()));
+ application, String.format("The application id=%s not found, revoke
failed.", appId));
// 1) delete files that have been published to workspace
application.getFsOperator().delete(application.getAppHome());
@@ -206,10 +205,10 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
}
@Override
- public void forcedStop(Application appParam) {
- CompletableFuture<SubmitResponse> startFuture =
startFutureMap.remove(appParam.getId());
- CompletableFuture<CancelResponse> cancelFuture =
cancelFutureMap.remove(appParam.getId());
- Application application = this.baseMapper.selectApp(appParam);
+ public void forcedStop(Long id) {
+ CompletableFuture<SubmitResponse> startFuture = startFutureMap.remove(id);
+ CompletableFuture<CancelResponse> cancelFuture =
cancelFutureMap.remove(id);
+ Application application = this.baseMapper.selectApp(id);
if (isKubernetesApp(application)) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(),
application.getJobId());
@@ -225,7 +224,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
cancelFuture.cancel(true);
}
if (startFuture == null && cancelFuture == null) {
- this.doStopped(appParam);
+ this.doStopped(id);
}
}
@@ -332,7 +331,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
applicationLogService.save(applicationLog);
if (throwable instanceof CancellationException) {
- doStopped(application);
+ doStopped(application.getId());
} else {
log.error("stop flink job failed.", throwable);
application.setOptionState(OptionStateEnum.NONE.getValue());
@@ -500,7 +499,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
if (throwable instanceof CancellationException) {
- doStopped(application);
+ doStopped(application.getId());
} else {
Application app = getById(appParam.getId());
app.setState(FlinkAppStateEnum.FAILED.getValue());
@@ -772,8 +771,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
return properties;
}
- private void doStopped(Application appParam) {
- Application application = getById(appParam);
+ private void doStopped(Long id) {
+ Application application = getById(id);
application.setOptionState(OptionStateEnum.NONE.getValue());
application.setState(FlinkAppStateEnum.CANCELED.getValue());
application.setOptionTime(new Date());
@@ -781,9 +780,9 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
savePointService.expire(application.getId());
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
- TrackId id = toTrackId(application);
- k8SFlinkTrackMonitor.unWatching(id);
- k8SFlinkTrackMonitor.doWatching(id);
+ TrackId trackId = toTrackId(application);
+ k8SFlinkTrackMonitor.unWatching(trackId);
+ k8SFlinkTrackMonitor.doWatching(trackId);
} else {
FlinkAppHttpWatcher.unWatching(application.getId());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 36a1d05cc..965266639 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -325,8 +325,8 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
}
@Override
- public AppExistsStateEnum checkStart(Application appParam) {
- Application application = getById(appParam.getId());
+ public AppExistsStateEnum checkStart(Long id) {
+ Application application = getById(id);
if (application == null) {
return AppExistsStateEnum.INVALID;
}
@@ -408,10 +408,10 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
}
@Override
- public String getYarnName(Application appParam) {
+ public String getYarnName(String appConfig) {
String[] args = new String[2];
args[0] = "--name";
- args[1] = appParam.getConfig();
+ args[1] = appConfig;
return ParameterCli.read(args);
}
@@ -479,8 +479,8 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
}
@Override
- public String readConf(Application appParam) throws IOException {
- File file = new File(appParam.getConfig());
+ public String readConf(String appConfig) throws IOException {
+ File file = new File(appConfig);
String conf = org.apache.streampark.common.util.FileUtils.readFile(file);
return Base64.getEncoder().encodeToString(conf.getBytes());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 681d6f7d7..b8e4f1f92 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -183,9 +183,9 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
@Override
- public Boolean remove(Application appParam) {
+ public Boolean remove(Long appId) {
- Application application = getById(appParam.getId());
+ Application application = getById(appId);
// 1) remove flink sql
flinkSqlService.removeByAppId(application.getId());
@@ -217,7 +217,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
flinkK8sObserver.unWatchById(application.getId());
}
} else {
- FlinkAppHttpWatcher.unWatching(appParam.getId());
+ FlinkAppHttpWatcher.unWatching(appId);
}
return true;
}
@@ -762,10 +762,10 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
@Override
- public Application getApp(Application appParam) {
- Application application = this.baseMapper.selectApp(appParam);
- ApplicationConfig config = configService.getEffective(appParam.getId());
- config = config == null ? configService.getLatest(appParam.getId()) :
config;
+ public Application getApp(Long id) {
+ Application application = this.baseMapper.selectApp(id);
+ ApplicationConfig config = configService.getEffective(id);
+ config = config == null ? configService.getLatest(id) : config;
if (config != null) {
config.setToApplication(application);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index b711e8d0f..76220dd56 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -211,14 +211,14 @@ public class ApplicationConfigServiceImpl
}
@Override
- public List<ApplicationConfig> list(Application appParam) {
+ public List<ApplicationConfig> list(Long appId) {
LambdaQueryWrapper<ApplicationConfig> queryWrapper =
new LambdaQueryWrapper<ApplicationConfig>()
- .eq(ApplicationConfig::getAppId, appParam.getId())
+ .eq(ApplicationConfig::getAppId, appId)
.orderByDesc(ApplicationConfig::getVersion);
List<ApplicationConfig> configList =
this.baseMapper.selectList(queryWrapper);
- fillEffectiveField(appParam.getId(), configList);
+ fillEffectiveField(appId, configList);
return configList;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 4e3875fee..658743688 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -342,8 +342,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public void remove(FlinkCluster cluster) {
- Long id = cluster.getId();
+ public void remove(Long id) {
FlinkCluster flinkCluster = getById(id);
ApiAlertException.throwIfNull(flinkCluster, "Flink cluster not exist,
please check.");
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 69c175307..d648a2294 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -117,14 +117,14 @@ public class FlinkSqlServiceImpl extends
ServiceImpl<FlinkSqlMapper, FlinkSql>
}
@Override
- public List<FlinkSql> listFlinkSqlHistory(Application application) {
+ public List<FlinkSql> listFlinkSqlHistory(Long appId) {
LambdaQueryWrapper<FlinkSql> queryWrapper =
new LambdaQueryWrapper<FlinkSql>()
- .eq(FlinkSql::getAppId, application.getId())
+ .eq(FlinkSql::getAppId, appId)
.orderByDesc(FlinkSql::getVersion);
List<FlinkSql> sqlList = this.baseMapper.selectList(queryWrapper);
- FlinkSql effective = getEffective(application.getId(), false);
+ FlinkSql effective = getEffective(appId, false);
if (effective != null && !sqlList.isEmpty()) {
for (FlinkSql sql : sqlList) {
if (sql.getId().equals(effective.getId())) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 6185be20a..29e6e7730 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -207,8 +207,8 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
}
@Override
- public void remove(Resource resource) {
- Resource findResource = getById(resource.getId());
+ public void remove(Long id) {
+ Resource findResource = getById(id);
checkOrElseAlert(findResource);
String filePath =
@@ -224,7 +224,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
FsOperator.lfs().delete(filePath);
- this.removeById(resource);
+ this.removeById(id);
}
public List<Resource> listByTeamId(Long teamId) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/MemberController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/MemberController.java
index b9feac9bd..900245f4e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/MemberController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/MemberController.java
@@ -96,7 +96,7 @@ public class MemberController {
@DeleteMapping("delete")
@RequiresPermissions("member:delete")
public RestResponse delete(Member member) {
- this.memberService.remove(member);
+ this.memberService.remove(member.getId());
return RestResponse.success();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/MemberService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/MemberService.java
index b92294365..a9936f22a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/MemberService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/MemberService.java
@@ -47,7 +47,7 @@ public interface MemberService extends IService<Member> {
void createMember(Member member);
- void remove(Member member);
+ void remove(Long id);
void updateMember(Member member);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
index c6634732a..4c231a63a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
@@ -150,13 +150,11 @@ public class MemberServiceImpl extends
ServiceImpl<MemberMapper, Member> impleme
}
@Override
- public void remove(Member memberArg) {
+ public void remove(Long id) {
Member member =
- Optional.ofNullable(this.getById(memberArg.getId()))
+ Optional.ofNullable(this.getById(id))
.orElseThrow(
- () ->
- new ApiAlertException(
- String.format("The member [id=%s] not found",
memberArg.getId())));
+ () -> new ApiAlertException(String.format("The member [id=%s]
not found", id)));
this.removeById(member);
userService.clearLastTeam(member.getUserId(), member.getTeamId());
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 2f968a3ff..6ae24e824 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -177,11 +177,11 @@
</where>
</select>
- <select id="selectApp"
resultType="org.apache.streampark.console.core.entity.Application"
parameterType="org.apache.streampark.console.core.entity.Application">
+ <select id="selectApp"
resultType="org.apache.streampark.console.core.entity.Application"
parameterType="long">
select t.*, p.name as projectName
from t_flink_app t left join t_flink_project p
on t.project_id = p.id
- where t.id = #{app.id}
+ where t.id = #{id}
</select>
<update id="persistMetrics"
parameterType="org.apache.streampark.console.core.entity.Application">
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
index 0e7f37732..10ca766f4 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceITest.java
@@ -110,7 +110,7 @@ class ApplicationManageServiceITest extends
SpringIntegrationTestBase {
Application appParam = new Application();
appParam.setId(100000L);
appParam.setTeamId(100000L);
- Application application = applicationManageService.getApp(appParam);
+ Application application =
applicationManageService.getApp(appParam.getId());
application.setFlinkClusterId(1L);
application.setSqlId(100000L);
application.setVersionId(1L);