This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 977463cce3ea0f88e2f184c30720bf4e8e97fd4a
Author: Dian Fu <[email protected]>
AuthorDate: Fri Nov 24 15:03:16 2023 +0800

    [FLINK-33613][python] Make sure gRPC server is shutdown gracefully if 
Python process startup failed
    
    This closes #23789.
---
 .../control/DefaultJobBundleFactory.java           | 33 +++++++++++++++++++---
 1 file changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index c22175b1742..b55945844c8 100644
--- 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -238,6 +238,26 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
             LoadingCache<Environment, WrappedSdkHarnessClient> cache =
                     cacheBuilder.build(
                             new CacheLoader<Environment, 
WrappedSdkHarnessClient>() {
+
+                                private void close(ServerInfo serverInfo) {
+                                    try (AutoCloseable provisioningServer =
+                                                    
serverInfo.getProvisioningServer();
+                                            AutoCloseable retrievalServer =
+                                                    
serverInfo.getRetrievalServer();
+                                            AutoCloseable stateServer =
+                                                    
serverInfo.getStateServer();
+                                            AutoCloseable dataServer = 
serverInfo.getDataServer();
+                                            AutoCloseable controlServer =
+                                                    
serverInfo.getControlServer();
+                                            // Close the logging server first 
to prevent spaming the
+                                            // logs with error messages
+                                            AutoCloseable loggingServer =
+                                                    
serverInfo.getLoggingServer()) {
+                                    } catch (Exception e) {
+                                        LOG.warn("Error cleaning up servers 
{}", serverInfo, e);
+                                    }
+                                }
+
                                 @Override
                                 public WrappedSdkHarnessClient 
load(Environment environment)
                                         throws Exception {
@@ -259,10 +279,15 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
                                                     
serverInfo.getProvisioningServer(),
                                                     clientPool,
                                                     stageIdGenerator);
-                                    return WrappedSdkHarnessClient.wrapping(
-                                            
environmentFactory.createEnvironment(
-                                                    environment, workerId),
-                                            serverInfo);
+                                    try {
+                                        return 
WrappedSdkHarnessClient.wrapping(
+                                                
environmentFactory.createEnvironment(
+                                                        environment, workerId),
+                                                serverInfo);
+                                    } catch (Exception e) {
+                                        close(serverInfo);
+                                        throw e;
+                                    }
                                 }
                             });
 

Reply via email to