This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-shutdown
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/k8s-shutdown by this push:
     new ce5976a0a [Improve] k8s shutdown bug fixed.
ce5976a0a is described below

commit ce5976a0a88dcfc8c0226680b9998708bdcc9e1e
Author: benjobs <[email protected]>
AuthorDate: Tue Jan 9 17:44:41 2024 +0800

    [Improve] k8s shutdown bug fixed.
---
 .../console/core/controller/FlinkClusterController.java     | 12 ++++++------
 .../console/core/service/FlinkClusterService.java           |  6 +++---
 .../console/core/service/impl/FlinkClusterServiceImpl.java  | 13 ++++++-------
 3 files changed, 15 insertions(+), 16 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 148cd554b..2b01e2a54 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -91,22 +91,22 @@ public class FlinkClusterController {
 
   @Operation(summary = "Start flink cluster")
   @PostMapping("start")
-  public RestResponse start(FlinkCluster cluster) {
-    flinkClusterService.start(cluster);
+  public RestResponse start(Long id) {
+    flinkClusterService.start(id);
     return RestResponse.success();
   }
 
   @Operation(summary = "Shutdown flink cluster")
   @PostMapping("shutdown")
-  public RestResponse shutdown(FlinkCluster cluster) {
-    flinkClusterService.shutdown(cluster);
+  public RestResponse shutdown(Long id) {
+    flinkClusterService.shutdown(id);
     return RestResponse.success();
   }
 
   @Operation(summary = "Delete flink cluster")
   @PostMapping("delete")
-  public RestResponse delete(FlinkCluster cluster) {
-    flinkClusterService.delete(cluster);
+  public RestResponse delete(Long id) {
+    flinkClusterService.delete(id);
     return RestResponse.success();
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d149152af..e9d0397ef 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -32,13 +32,13 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
 
   Boolean create(FlinkCluster flinkCluster);
 
-  void delete(FlinkCluster flinkCluster);
+  void delete(Long id);
 
   void update(FlinkCluster flinkCluster);
 
-  void start(FlinkCluster flinkCluster);
+  void start(Long id);
 
-  void shutdown(FlinkCluster flinkCluster);
+  void shutdown(Long id);
 
   Boolean existsByClusterId(String clusterId, Long id);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index f73a58290..ef1f4a283 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -148,8 +148,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Override
   @Transactional(rollbackFor = {Exception.class})
-  public void start(FlinkCluster cluster) {
-    FlinkCluster flinkCluster = getById(cluster.getId());
+  public void start(Long id) {
+    FlinkCluster flinkCluster = getById(id);
     try {
       ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
       DeployRequest deployRequest = getDeployRequest(flinkCluster);
@@ -243,8 +243,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public void shutdown(FlinkCluster cluster) {
-    FlinkCluster flinkCluster = this.getById(cluster.getId());
+  public void shutdown(Long id) {
+    FlinkCluster flinkCluster = this.getById(id);
     // 1) check mode
     ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
     String clusterId = flinkCluster.getClusterId();
@@ -274,7 +274,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
 
     // 4) shutdown
-    DeployRequest deployRequest = getDeployRequest(cluster);
+    DeployRequest deployRequest = getDeployRequest(flinkCluster);
     try {
       Future<ShutDownResponse> future =
           executorService.submit(() -> FlinkClient.shutdown(deployRequest));
@@ -325,8 +325,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public void delete(FlinkCluster cluster) {
-    Long id = cluster.getId();
+  public void delete(Long id) {
     FlinkCluster flinkCluster = getById(id);
     if (flinkCluster == null) {
       throw new ApiAlertException("flink cluster not exist, please check.");

Reply via email to