This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new a64224407 [Improve] fixed session cluster start/shutdown wait time
config. (#3626)
a64224407 is described below
commit a64224407b2b4370cfc401c4f1a86309c773c5f2
Author: ouyangwulin <[email protected]>
AuthorDate: Sun Mar 24 00:15:45 2024 +0800
[Improve] fixed session cluster start/shutdown wait time config. (#3626)
* [Improve] add yarn api error log for debug some problems
* [Improve] fixed session cluster start/shutdown wait time config.
---------
Co-authored-by: [email protected] <[email protected]>
---
.../core/service/impl/FlinkClusterServiceImpl.java | 96 +++++++++++-----------
1 file changed, 50 insertions(+), 46 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 1240313d9..5bbaa768d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -57,8 +57,8 @@ import
org.springframework.transaction.annotation.Transactional;
import java.util.Collection;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -172,39 +172,41 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
DeployRequest deployRequest = getDeployRequest(flinkCluster);
log.info("deploy cluster request: {}", deployRequest);
- Future<DeployResponse> future =
- bootstrapExecutor.submit(() -> FlinkClient.deploy(deployRequest));
- DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
- if (deployResponse.error() != null) {
- throw new ApiDetailException(
- "deploy cluster "
- + flinkCluster.getClusterName()
- + "failed, exception:\n"
- + Utils.stringifyException(deployResponse.error()));
- } else {
- // 2) setAddress
- if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
- String address =
- YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
- flinkCluster.setAddress(address);
- } else {
- flinkCluster.setAddress(deployResponse.address());
- }
- flinkCluster.setClusterId(deployResponse.clusterId());
- flinkCluster.setClusterState(ClusterState.STARTED.getValue());
- flinkCluster.setException(null);
- updateById(flinkCluster);
-
- // k8s session mode ingress
- if (ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(executionModeEnum))
{
- try {
- serviceHelper.configureIngress(
- flinkCluster.getClusterId(), flinkCluster.getK8sNamespace());
- } catch (KubernetesClientException e) {
- log.info("Failed to create ingress: {}", e.getMessage());
- }
- }
- }
+ CompletableFuture<DeployResponse> future =
+ CompletableFuture.supplyAsync(() ->
FlinkClient.deploy(deployRequest));
+ future.whenComplete(
+ ((deployResponse, throwable) -> {
+ if (throwable != null) {
+ throw new ApiDetailException(
+ "deploy cluster "
+ + flinkCluster.getClusterName()
+ + "failed, exception:\n"
+ + Utils.stringifyException(deployResponse.error()));
+ } else {
+ // 2) setAddress
+ if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
+ String address =
+ YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
+ flinkCluster.setAddress(address);
+ } else {
+ flinkCluster.setAddress(deployResponse.address());
+ }
+ flinkCluster.setClusterId(deployResponse.clusterId());
+ flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+ flinkCluster.setException(null);
+ updateById(flinkCluster);
+
+ // k8s session mode ingress
+ if
(ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(executionModeEnum)) {
+ try {
+ serviceHelper.configureIngress(
+ flinkCluster.getClusterId(),
flinkCluster.getK8sNamespace());
+ } catch (KubernetesClientException e) {
+ log.info("Failed to create ingress: {}", e.getMessage());
+ }
+ }
+ }
+ }));
} catch (Exception e) {
log.error(e.getMessage(), e);
flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
@@ -311,18 +313,20 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
// 4) shutdown
DeployRequest deployRequest = getDeployRequest(flinkCluster);
try {
- Future<ShutDownResponse> future =
- bootstrapExecutor.submit(() -> FlinkClient.shutdown(deployRequest));
- ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
- if (shutDownResponse.error() != null) {
- throw new ApiDetailException(
- "shutdown cluster failed, error: \n"
- + Utils.stringifyException(shutDownResponse.error()));
- } else {
- flinkCluster.setAddress(null);
- flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
- updateById(flinkCluster);
- }
+ CompletableFuture<ShutDownResponse> future =
+ CompletableFuture.supplyAsync(() ->
FlinkClient.shutdown(deployRequest));
+ future.whenComplete(
+ ((shutDownResponse, throwable) -> {
+ if (shutDownResponse.error() != null) {
+ throw new ApiDetailException(
+ "shutdown cluster failed, error: \n"
+ + Utils.stringifyException(shutDownResponse.error()));
+ } else {
+ flinkCluster.setAddress(null);
+ flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+ updateById(flinkCluster);
+ }
+ }));
} catch (Exception e) {
log.error(e.getMessage(), e);
flinkCluster.setException(e.toString());