This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 977463cce3ea0f88e2f184c30720bf4e8e97fd4a Author: Dian Fu <[email protected]> AuthorDate: Fri Nov 24 15:03:16 2023 +0800 [FLINK-33613][python] Make sure gRPC server is shutdown gracefully if Python process startup failed This closes #23789. --- .../control/DefaultJobBundleFactory.java | 33 +++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index c22175b1742..b55945844c8 100644 --- a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -238,6 +238,26 @@ public class DefaultJobBundleFactory implements JobBundleFactory { LoadingCache<Environment, WrappedSdkHarnessClient> cache = cacheBuilder.build( new CacheLoader<Environment, WrappedSdkHarnessClient>() { + + private void close(ServerInfo serverInfo) { + try (AutoCloseable provisioningServer = + serverInfo.getProvisioningServer(); + AutoCloseable retrievalServer = + serverInfo.getRetrievalServer(); + AutoCloseable stateServer = + serverInfo.getStateServer(); + AutoCloseable dataServer = serverInfo.getDataServer(); + AutoCloseable controlServer = + serverInfo.getControlServer(); + // Close the logging server first to prevent spaming the + // logs with error messages + AutoCloseable loggingServer = + serverInfo.getLoggingServer()) { + } catch (Exception e) { + LOG.warn("Error cleaning up servers {}", serverInfo, e); + } + } + @Override public WrappedSdkHarnessClient load(Environment environment) throws Exception { @@ -259,10 +279,15 @@ public class DefaultJobBundleFactory implements JobBundleFactory { serverInfo.getProvisioningServer(), clientPool, stageIdGenerator); - return WrappedSdkHarnessClient.wrapping( - environmentFactory.createEnvironment( - environment, workerId), - serverInfo); + try { + return WrappedSdkHarnessClient.wrapping( + environmentFactory.createEnvironment( + environment, workerId), + serverInfo); + } catch (Exception e) { + close(serverInfo); + throw e; + } } });
