This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 26118e6  Improve error handling during localrun start (#10450)
26118e6 is described below

commit 26118e6d8feba9f70aaadc4323a509245b545431
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon May 3 21:26:17 2021 -0700

    Improve error handling during localrun start (#10450)
---
 .../org/apache/pulsar/functions/LocalRunner.java   | 156 +++++++++++----------
 1 file changed, 84 insertions(+), 72 deletions(-)

diff --git 
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
 
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index c7e0c28..fb6c626 100644
--- 
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ 
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
@@ -94,6 +95,8 @@ public class LocalRunner implements AutoCloseable {
     private final Thread shutdownHook;
     private ClassLoader userCodeClassLoader;
     private boolean userCodeClassLoaderCreated;
+    private RuntimeFactory runtimeFactory;
+    private HTTPServer metricsServer;
 
     public enum RuntimeEnv {
         THREAD,
@@ -185,7 +188,12 @@ public class LocalRunner implements AutoCloseable {
 
         // parse args by JCommander
         jcommander.parse(args);
-        localRunner.start(true);
+        try {
+            localRunner.start(true);
+        } catch (Exception e) {
+            log.error("Encountered error starting localrunner", e);
+            localRunner.close();
+        }
     }
 
     @Builder
@@ -227,11 +235,13 @@ public class LocalRunner implements AutoCloseable {
             this.connectorsDir = Paths.get(pulsarHome, 
"connectors").toString();
         }
         this.metricsPortStart = metricsPortStart;
-        shutdownHook = new Thread() {
-            public void run() {
-                LocalRunner.this.stop();
+        shutdownHook = new Thread(() -> {
+            try {
+                LocalRunner.this.close();
+            } catch (Exception exception) {
+                log.warn("Encountered exception when closing localrunner", 
exception);
             }
-        };
+        });
     }
 
     private static File createNarExtractionTempDirectory() {
@@ -260,12 +270,21 @@ public class LocalRunner implements AutoCloseable {
             } catch (IllegalStateException e) {
                 // ignore possible "Shutdown in progress"
             }
-            log.info("Shutting down the localrun runtimeSpawner ...");
+
+            if (metricsServer != null) {
+                metricsServer.stop();
+            }
+
             for (RuntimeSpawner spawner : spawners) {
                 spawner.close();
             }
             spawners.clear();
 
+            if (runtimeFactory != null) {
+                runtimeFactory.close();
+                runtimeFactory = null;
+            }
+
             if (userCodeClassLoaderCreated) {
                 if (userCodeClassLoader instanceof Closeable) {
                     try {
@@ -464,7 +483,7 @@ public class LocalRunner implements AutoCloseable {
                                            String stateStorageServiceUrl, 
AuthenticationConfig authConfig,
                                            String userCodeFile) throws 
Exception {
         SecretsProviderConfigurator secretsProviderConfigurator = 
getSecretsProviderConfigurator();
-        try (ProcessRuntimeFactory containerFactory = new 
ProcessRuntimeFactory(
+        runtimeFactory = new ProcessRuntimeFactory(
                 serviceUrl,
                 webServiceUrl,
                 stateStorageServiceUrl,
@@ -475,71 +494,66 @@ public class LocalRunner implements AutoCloseable {
                 null, /* extra dependencies dir */
                 narExtractionDirectory, /* nar extraction dir */
                 secretsProviderConfigurator,
-                false, Optional.empty(), Optional.empty())) {
-
-            for (int i = 0; i < parallelism; ++i) {
-                InstanceConfig instanceConfig = new InstanceConfig();
-                instanceConfig.setFunctionDetails(functionDetails);
-                // TODO: correctly implement function version and id
-                
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
-                instanceConfig.setFunctionId(UUID.randomUUID().toString());
-                instanceConfig.setInstanceId(i + instanceIdOffset);
-                instanceConfig.setMaxBufferedTuples(1024);
-                instanceConfig.setPort(FunctionCommon.findAvailablePort());
-
-                if (metricsPortStart != null) {
-                    int metricsPort = metricsPortStart + i;
-                    if (metricsPortStart < 0 || metricsPortStart > 65535) {
-                        throw new IllegalArgumentException("Metrics port need 
to be within the range of 0 and 65535");
-                    }
-                    instanceConfig.setMetricsPort(metricsPort);
-                } else {
-                    
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+                false, Optional.empty(), Optional.empty());
+
+        for (int i = 0; i < parallelism; ++i) {
+            InstanceConfig instanceConfig = new InstanceConfig();
+            instanceConfig.setFunctionDetails(functionDetails);
+            // TODO: correctly implement function version and id
+            instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
+            instanceConfig.setFunctionId(UUID.randomUUID().toString());
+            instanceConfig.setInstanceId(i + instanceIdOffset);
+            instanceConfig.setMaxBufferedTuples(1024);
+            instanceConfig.setPort(FunctionCommon.findAvailablePort());
+
+            if (metricsPortStart != null) {
+                int metricsPort = metricsPortStart + i;
+                if (metricsPortStart < 0 || metricsPortStart > 65535) {
+                    throw new IllegalArgumentException("Metrics port need to 
be within the range of 0 and 65535");
                 }
-                instanceConfig.setClusterName("local");
-                if (functionConfig != null) {
-                    
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
-                    if (functionConfig.getExposePulsarAdminClientEnabled() != 
null) {
-                        
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
-                    }
+                instanceConfig.setMetricsPort(metricsPort);
+            } else {
+                
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+            }
+            instanceConfig.setClusterName("local");
+            if (functionConfig != null) {
+                
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+                if (functionConfig.getExposePulsarAdminClientEnabled() != 
null) {
+                    
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
                 }
-                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
-                        instanceConfig,
-                        userCodeFile,
-                        null,
-                        containerFactory,
-                        30000);
-                spawners.add(runtimeSpawner);
-                runtimeSpawner.start();
             }
-            Timer statusCheckTimer = new Timer();
-            statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    CompletableFuture<String>[] futures = new 
CompletableFuture[spawners.size()];
-                    int index = 0;
-                    for (RuntimeSpawner spawner : spawners) {
-                        futures[index] = 
spawner.getFunctionStatusAsJson(index);
-                        index++;
-                    }
-                    try {
-                        CompletableFuture.allOf(futures).get(5, 
TimeUnit.SECONDS);
-                        for (index = 0; index < futures.length; ++index) {
-                            String json = futures[index].get();
-                            Gson gson = new 
GsonBuilder().setPrettyPrinting().create();
-                            log.info(gson.toJson(new 
JsonParser().parse(json)));
-                        }
-                    } catch (TimeoutException | InterruptedException | 
ExecutionException e) {
-                        log.error("Could not get status from all local 
instances");
-                    }
+            RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+                    instanceConfig,
+                    userCodeFile,
+                    null,
+                    runtimeFactory,
+                    30000);
+            spawners.add(runtimeSpawner);
+            runtimeSpawner.start();
+        }
+        Timer statusCheckTimer = new Timer();
+        statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                CompletableFuture<String>[] futures = new 
CompletableFuture[spawners.size()];
+                int index = 0;
+                for (RuntimeSpawner spawner : spawners) {
+                    futures[index] = spawner.getFunctionStatusAsJson(index);
+                    index++;
                 }
-            }, 30000, 30000);
-            java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
-                public void run() {
-                    statusCheckTimer.cancel();
+                try {
+                    CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
+                    for (index = 0; index < futures.length; ++index) {
+                        String json = futures[index].get();
+                        Gson gson = new 
GsonBuilder().setPrettyPrinting().create();
+                        log.info(gson.toJson(new JsonParser().parse(json)));
+                    }
+                } catch (TimeoutException | InterruptedException | 
ExecutionException e) {
+                    log.error("Could not get status from all local instances");
                 }
-            });
-        }
+            }
+        }, 30000, 30000);
+        java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> 
statusCheckTimer.cancel()));
     }
 
 
@@ -574,13 +588,12 @@ public class LocalRunner implements AutoCloseable {
         FunctionCollectorRegistry collectorRegistry = 
FunctionCollectorRegistry.getDefaultImplementation();
         RuntimeUtils.registerDefaultCollectors(collectorRegistry);
 
-        ThreadRuntimeFactory threadRuntimeFactory;
         ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             if (userCodeClassLoader != null) {
                 
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
             }
-            threadRuntimeFactory = new 
ThreadRuntimeFactory("LocalRunnerThreadGroup",
+            runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
                     serviceUrl,
                     stateStorageServiceUrl,
                     authConfig,
@@ -614,16 +627,15 @@ public class LocalRunner implements AutoCloseable {
                     instanceConfig,
                     userCodeFile,
                     null,
-                    threadRuntimeFactory,
+                    runtimeFactory,
                     30000);
             spawners.add(runtimeSpawner);
             runtimeSpawner.start();
         }
-
         if (metricsPortStart != null) {
             // starting metrics server
             log.info("Starting metrics server on port {}", metricsPortStart);
-            new HTTPServer(new InetSocketAddress(metricsPortStart), 
collectorRegistry, true);
+            metricsServer = new HTTPServer(new 
InetSocketAddress(metricsPortStart), collectorRegistry, true);
         }
     }
 

Reply via email to