This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch k8s-shutdown
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/k8s-shutdown by this push:
new ce5976a0a [Improve] k8s shutdown bug fixed.
ce5976a0a is described below
commit ce5976a0a88dcfc8c0226680b9998708bdcc9e1e
Author: benjobs <[email protected]>
AuthorDate: Tue Jan 9 17:44:41 2024 +0800
[Improve] k8s shutdown bug fixed.
---
.../console/core/controller/FlinkClusterController.java | 12 ++++++------
.../console/core/service/FlinkClusterService.java | 6 +++---
.../console/core/service/impl/FlinkClusterServiceImpl.java | 13 ++++++-------
3 files changed, 15 insertions(+), 16 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 148cd554b..2b01e2a54 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -91,22 +91,22 @@ public class FlinkClusterController {
@Operation(summary = "Start flink cluster")
@PostMapping("start")
- public RestResponse start(FlinkCluster cluster) {
- flinkClusterService.start(cluster);
+ public RestResponse start(Long id) {
+ flinkClusterService.start(id);
return RestResponse.success();
}
@Operation(summary = "Shutdown flink cluster")
@PostMapping("shutdown")
- public RestResponse shutdown(FlinkCluster cluster) {
- flinkClusterService.shutdown(cluster);
+ public RestResponse shutdown(Long id) {
+ flinkClusterService.shutdown(id);
return RestResponse.success();
}
@Operation(summary = "Delete flink cluster")
@PostMapping("delete")
- public RestResponse delete(FlinkCluster cluster) {
- flinkClusterService.delete(cluster);
+ public RestResponse delete(Long id) {
+ flinkClusterService.delete(id);
return RestResponse.success();
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d149152af..e9d0397ef 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -32,13 +32,13 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
Boolean create(FlinkCluster flinkCluster);
- void delete(FlinkCluster flinkCluster);
+ void delete(Long id);
void update(FlinkCluster flinkCluster);
- void start(FlinkCluster flinkCluster);
+ void start(Long id);
- void shutdown(FlinkCluster flinkCluster);
+ void shutdown(Long id);
Boolean existsByClusterId(String clusterId, Long id);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index f73a58290..ef1f4a283 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -148,8 +148,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Override
@Transactional(rollbackFor = {Exception.class})
- public void start(FlinkCluster cluster) {
- FlinkCluster flinkCluster = getById(cluster.getId());
+ public void start(Long id) {
+ FlinkCluster flinkCluster = getById(id);
try {
ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
DeployRequest deployRequest = getDeployRequest(flinkCluster);
@@ -243,8 +243,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public void shutdown(FlinkCluster cluster) {
- FlinkCluster flinkCluster = this.getById(cluster.getId());
+ public void shutdown(Long id) {
+ FlinkCluster flinkCluster = this.getById(id);
// 1) check mode
ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
String clusterId = flinkCluster.getClusterId();
@@ -274,7 +274,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
// 4) shutdown
- DeployRequest deployRequest = getDeployRequest(cluster);
+ DeployRequest deployRequest = getDeployRequest(flinkCluster);
try {
Future<ShutDownResponse> future =
executorService.submit(() -> FlinkClient.shutdown(deployRequest));
@@ -325,8 +325,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public void delete(FlinkCluster cluster) {
- Long id = cluster.getId();
+ public void delete(Long id) {
FlinkCluster flinkCluster = getById(id);
if (flinkCluster == null) {
throw new ApiAlertException("flink cluster not exist, please check.");