wangyang0918 commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r752813610
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##########
@@ -205,17 +211,23 @@ private String getWebMonitorAddress(Configuration
configuration) throws Exceptio
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only
have one jar");
}
- final ClusterClientProvider<String> clusterClientProvider =
- deployClusterInternal(
- KubernetesApplicationClusterEntrypoint.class.getName(),
- clusterSpecification,
- false);
+ ClusterClientProvider<String> clusterClientProvider;
+ try {
+ clusterClientProvider =
+ deployClusterInternal(
+
KubernetesApplicationClusterEntrypoint.class.getName(),
+ clusterSpecification,
+ false);
- try (ClusterClient<String> clusterClient =
clusterClientProvider.getClusterClient()) {
- LOG.info(
- "Create flink application cluster {} successfully,
JobManager Web Interface: {}",
- clusterId,
- clusterClient.getWebInterfaceURL());
+ try (ClusterClient<String> clusterClient =
clusterClientProvider.getClusterClient()) {
+ LOG.info(
+ "Create flink application cluster {} successfully,
JobManager Web Interface: {}",
+ clusterId,
+ clusterClient.getWebInterfaceURL());
+ }
+ } catch (Exception e) {
Review comment:
I am curious whether we could wrap the `try...catch {// clean up
resources}` in a separate method. Just like following. WDYT?
```
private <T> ClusterClientProvider<T> safelyDeployCluster(
SupplierWithException<ClusterClientProvider<T>, Exception>
supplier)
throws ClusterDeploymentException {
try {
return supplier.get();
} catch (Exception e) {
try {
LOG.warn(
"Failed to create the Kubernetes cluster \"{}\", try
to clean up the residual resources.",
clusterId);
client.stopAndCleanupCluster(clusterId);
} catch (Exception ex) {
LOG.warn(
"Failed to stop and clean up the Kubernetes cluster
\"{}\".", clusterId, e);
}
throw new ClusterDeploymentException(e);
}
}
```
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
##########
@@ -131,16 +131,14 @@ public void testKillCluster() throws Exception {
}
@Test
- public void testDeployApplicationCluster() {
+ public void testDeployApplicationCluster() throws
ClusterDeploymentException {
flinkConfig.set(
PipelineOptions.JARS,
Collections.singletonList("local:///path/of/user.jar"));
flinkConfig.set(DeploymentOptions.TARGET,
KubernetesDeploymentTarget.APPLICATION.getName());
- try {
- descriptor.deployApplicationCluster(clusterSpecification,
appConfig);
- } catch (Exception ignored) {
- }
- mockExpectedServiceFromServerSide(loadBalancerSvc);
+ mockFirstEmptyFollowByExpectedServiceFromServerSide(new Service(),
loadBalancerSvc);
Review comment:
I like this change. Great.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##########
@@ -256,39 +268,35 @@ private String getWebMonitorAddress(Configuration
configuration) throws Exceptio
flinkConfig.get(JobManagerOptions.PORT));
}
+ final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+ new KubernetesJobManagerParameters(flinkConfig,
clusterSpecification);
+
+ final FlinkPod podTemplate =
+ kubernetesJobManagerParameters
+ .getPodTemplateFilePath()
+ .map(
+ file ->
+
KubernetesUtils.loadPodFromTemplateFile(
+ client, file,
Constants.MAIN_CONTAINER_NAME))
+ .orElse(new FlinkPod.Builder().build());
+ final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+ podTemplate, kubernetesJobManagerParameters);
+
+ client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+ return createClusterClientProvider(clusterId);
+ }
+
+ private void killClusterSilently(Throwable throwable) {
try {
- final KubernetesJobManagerParameters
kubernetesJobManagerParameters =
- new KubernetesJobManagerParameters(flinkConfig,
clusterSpecification);
-
- final FlinkPod podTemplate =
- kubernetesJobManagerParameters
- .getPodTemplateFilePath()
- .map(
- file ->
-
KubernetesUtils.loadPodFromTemplateFile(
- client, file,
Constants.MAIN_CONTAINER_NAME))
- .orElse(new FlinkPod.Builder().build());
- final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
- podTemplate, kubernetesJobManagerParameters);
-
- client.createJobManagerComponent(kubernetesJobManagerSpec);
-
- return createClusterClientProvider(clusterId);
+ LOG.warn(
+ "Failed to create the Kubernetes cluster \"{}\", try to
clean up the residual resources.",
+ clusterId,
+ throwable);
Review comment:
Do we need to log the throwable here? We might have duplicated exception
stack since we also throw a new `ClusterDeploymentException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]