This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch k8s-deploy in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit aac96404734de84de88277a04437fea08e28d3e1 Author: benjobs <[email protected]> AuthorDate: Sun Jan 14 16:39:31 2024 +0800 [Improve] k8s deploy-cluster | start-app error info improvement --- .../core/service/impl/ApplicationServiceImpl.java | 8 +++-- .../core/service/impl/FlinkClusterServiceImpl.java | 14 +++++---- .../flink/client/bean/DeployResponse.scala | 2 +- .../flink/client/bean/ShutDownResponse.scala | 2 +- .../impl/KubernetesNativeSessionClient.scala | 34 +++++++++------------- .../flink/client/impl/YarnSessionClient.scala | 16 ++-------- .../flink/client/trait/FlinkClientTrait.scala | 7 +++++ 7 files changed, 39 insertions(+), 44 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index c27e5772e..5ed782953 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -79,7 +79,7 @@ import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.VariableService; import org.apache.streampark.console.core.service.YarnQueueService; import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher; -import org.apache.streampark.flink.client.FlinkClient; +import org.apache.streampark.flink.client.FlinkClientHandler; import org.apache.streampark.flink.client.bean.CancelRequest; import org.apache.streampark.flink.client.bean.CancelResponse; import org.apache.streampark.flink.client.bean.SubmitRequest; @@ -1324,7 +1324,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli final Date triggerTime = new Date(); CompletableFuture<CancelResponse> cancelFuture = - CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService); + CompletableFuture.supplyAsync( + () -> FlinkClientHandler.cancel(cancelRequest), executorService); cancelFutureMap.put(application.getId(), cancelFuture); @@ -1591,7 +1592,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : null; CompletableFuture<SubmitResponse> future = - CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService); + CompletableFuture.supplyAsync( + () -> FlinkClientHandler.submit(submitRequest), executorService); startFutureMap.put(application.getId(), future); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index ef1f4a283..84e10e45a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -20,6 +20,7 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.util.ThreadUtils; +import org.apache.streampark.common.util.Utils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApiDetailException; @@ -157,7 +158,7 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli Future<DeployResponse> future = executorService.submit(() -> FlinkClient.deploy(deployRequest)); DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS); - if (deployResponse != null) { + if (deployResponse.error() == null) { if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) { String address = YarnUtils.getRMWebAppURL(true) + "/proxy/" + deployResponse.clusterId() + "/"; @@ -171,7 +172,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli updateById(flinkCluster); } else { throw new ApiAlertException( - "deploy cluster failed, unknown reason,please check you params or StreamPark error log"); + "deploy cluster failed, exception:\n" + + Utils.stringifyException(deployResponse.error())); } } catch (Exception e) { log.error(e.getMessage(), e); @@ -279,12 +281,14 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli Future<ShutDownResponse> future = executorService.submit(() -> FlinkClient.shutdown(deployRequest)); ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS); - if (shutDownResponse != null) { + if (shutDownResponse.error() != null) { + throw new ApiDetailException( + "shutdown cluster failed, error: \n" + + Utils.stringifyException(shutDownResponse.error())); + } else { flinkCluster.setAddress(null); flinkCluster.setClusterState(ClusterState.STOPPED.getValue()); updateById(flinkCluster); - } else { - throw new ApiAlertException("get shutdown response failed"); } } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala index 91b6ecad0..82c452ece 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala @@ -17,4 +17,4 @@ package org.apache.streampark.flink.client.bean -case class DeployResponse(address: String, clusterId: String) +case class DeployResponse(address: String, clusterId: String, error: Throwable = null) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala index 5c9a14728..de2df366f 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala @@ -17,4 +17,4 @@ package org.apache.streampark.flink.client.bean -case class ShutDownResponse() +case class ShutDownResponse(error: Throwable = null) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index 7638be720..cc74e31a9 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -169,16 +169,9 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo client = clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient } - - if (client.getWebInterfaceURL != null) { - DeployResponse(client.getWebInterfaceURL, client.getClusterId) - } else { - null - } + getDeployResponse(client) } catch { - case e: Exception => - logError(s"start flink session fail in ${deployRequest.executionMode} mode") - throw e + case e: Exception => DeployResponse(null, null, e) } finally { Utils.close(client, clusterDescriptor, kubeClient) } @@ -204,18 +197,17 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo val flinkConfig = this.getFlinkK8sConfig(shutDownRequest) val clusterDescriptor = getK8sClusterDescriptor(flinkConfig) - val client = clusterDescriptor - .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)) - .getClusterClient - try { - client.shutDownCluster() - ShutDownResponse() - } catch { - case e: Exception => - logError(s"shutdown flink session fail in ${shutDownRequest.executionMode} mode") - throw e - } finally { - Utils.close(client) + Try( + clusterDescriptor + .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)) + .getClusterClient + ) match { + case Failure(e) => ShutDownResponse(e) + case Success(c) => + Try(c.shutDownCluster()) match { + case Success(_) => ShutDownResponse() + case Failure(e) => ShutDownResponse(e) + } } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index edcd6ed75..8fa9922ad 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -189,16 +189,9 @@ object YarnSessionClient extends YarnClientTrait { } val clientProvider = clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1) client = clientProvider.getClusterClient - if (client.getWebInterfaceURL != null) { - DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString) - } else { - null - } + getDeployResponse(client) } catch { - case e: Exception => - logError(s"start flink session fail in ${deployRequest.executionMode} mode") - e.printStackTrace() - throw e + case e: Exception => DeployResponse(null, null, e) } finally { Utils.close(client, clusterDescriptor) } @@ -234,10 +227,7 @@ object YarnSessionClient extends YarnClientTrait { .getFinalApplicationStatus}") ShutDownResponse() } catch { - case e: Exception => - logError(s"shutdown flink session fail in ${shutDownRequest.executionMode} mode") - e.printStackTrace() - throw e + case e: Exception => ShutDownResponse(e) } finally { Utils.close(client, clusterDescriptor) } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index f1b171310..315f2afcf 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -524,4 +524,11 @@ trait FlinkClientTrait extends Logger { clientWrapper.triggerSavepoint(jobID, savepointPath).get() } + def getDeployResponse(client: ClusterClient[_]): DeployResponse = { + if (client.getWebInterfaceURL != null) { + DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString) + } else { + DeployResponse(null, null, new RuntimeException("get the cluster getWebInterfaceURL failed.")) + } + } }
