This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 26118e6 Improve error handling during localrun start (#10450)
26118e6 is described below
commit 26118e6d8feba9f70aaadc4323a509245b545431
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon May 3 21:26:17 2021 -0700
Improve error handling during localrun start (#10450)
---
.../org/apache/pulsar/functions/LocalRunner.java | 156 +++++++++++----------
1 file changed, 84 insertions(+), 72 deletions(-)
diff --git
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index c7e0c28..fb6c626 100644
---
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
@@ -94,6 +95,8 @@ public class LocalRunner implements AutoCloseable {
private final Thread shutdownHook;
private ClassLoader userCodeClassLoader;
private boolean userCodeClassLoaderCreated;
+ private RuntimeFactory runtimeFactory;
+ private HTTPServer metricsServer;
public enum RuntimeEnv {
THREAD,
@@ -185,7 +188,12 @@ public class LocalRunner implements AutoCloseable {
// parse args by JCommander
jcommander.parse(args);
- localRunner.start(true);
+ try {
+ localRunner.start(true);
+ } catch (Exception e) {
+ log.error("Encountered error starting localrunner", e);
+ localRunner.close();
+ }
}
@Builder
@@ -227,11 +235,13 @@ public class LocalRunner implements AutoCloseable {
this.connectorsDir = Paths.get(pulsarHome,
"connectors").toString();
}
this.metricsPortStart = metricsPortStart;
- shutdownHook = new Thread() {
- public void run() {
- LocalRunner.this.stop();
+ shutdownHook = new Thread(() -> {
+ try {
+ LocalRunner.this.close();
+ } catch (Exception exception) {
+ log.warn("Encountered exception when closing localrunner",
exception);
}
- };
+ });
}
private static File createNarExtractionTempDirectory() {
@@ -260,12 +270,21 @@ public class LocalRunner implements AutoCloseable {
} catch (IllegalStateException e) {
// ignore possible "Shutdown in progress"
}
- log.info("Shutting down the localrun runtimeSpawner ...");
+
+ if (metricsServer != null) {
+ metricsServer.stop();
+ }
+
for (RuntimeSpawner spawner : spawners) {
spawner.close();
}
spawners.clear();
+ if (runtimeFactory != null) {
+ runtimeFactory.close();
+ runtimeFactory = null;
+ }
+
if (userCodeClassLoaderCreated) {
if (userCodeClassLoader instanceof Closeable) {
try {
@@ -464,7 +483,7 @@ public class LocalRunner implements AutoCloseable {
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
String userCodeFile) throws
Exception {
SecretsProviderConfigurator secretsProviderConfigurator =
getSecretsProviderConfigurator();
- try (ProcessRuntimeFactory containerFactory = new
ProcessRuntimeFactory(
+ runtimeFactory = new ProcessRuntimeFactory(
serviceUrl,
webServiceUrl,
stateStorageServiceUrl,
@@ -475,71 +494,66 @@ public class LocalRunner implements AutoCloseable {
null, /* extra dependencies dir */
narExtractionDirectory, /* nar extraction dir */
secretsProviderConfigurator,
- false, Optional.empty(), Optional.empty())) {
-
- for (int i = 0; i < parallelism; ++i) {
- InstanceConfig instanceConfig = new InstanceConfig();
- instanceConfig.setFunctionDetails(functionDetails);
- // TODO: correctly implement function version and id
-
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
- instanceConfig.setFunctionId(UUID.randomUUID().toString());
- instanceConfig.setInstanceId(i + instanceIdOffset);
- instanceConfig.setMaxBufferedTuples(1024);
- instanceConfig.setPort(FunctionCommon.findAvailablePort());
-
- if (metricsPortStart != null) {
- int metricsPort = metricsPortStart + i;
- if (metricsPortStart < 0 || metricsPortStart > 65535) {
- throw new IllegalArgumentException("Metrics port need
to be within the range of 0 and 65535");
- }
- instanceConfig.setMetricsPort(metricsPort);
- } else {
-
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+ false, Optional.empty(), Optional.empty());
+
+ for (int i = 0; i < parallelism; ++i) {
+ InstanceConfig instanceConfig = new InstanceConfig();
+ instanceConfig.setFunctionDetails(functionDetails);
+ // TODO: correctly implement function version and id
+ instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
+ instanceConfig.setFunctionId(UUID.randomUUID().toString());
+ instanceConfig.setInstanceId(i + instanceIdOffset);
+ instanceConfig.setMaxBufferedTuples(1024);
+ instanceConfig.setPort(FunctionCommon.findAvailablePort());
+
+ if (metricsPortStart != null) {
+ int metricsPort = metricsPortStart + i;
+ if (metricsPortStart < 0 || metricsPortStart > 65535) {
+ throw new IllegalArgumentException("Metrics port need to
be within the range of 0 and 65535");
}
- instanceConfig.setClusterName("local");
- if (functionConfig != null) {
-
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
- if (functionConfig.getExposePulsarAdminClientEnabled() !=
null) {
-
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
- }
+ instanceConfig.setMetricsPort(metricsPort);
+ } else {
+
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+ }
+ instanceConfig.setClusterName("local");
+ if (functionConfig != null) {
+
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+ if (functionConfig.getExposePulsarAdminClientEnabled() !=
null) {
+
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
- RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
- instanceConfig,
- userCodeFile,
- null,
- containerFactory,
- 30000);
- spawners.add(runtimeSpawner);
- runtimeSpawner.start();
}
- Timer statusCheckTimer = new Timer();
- statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- CompletableFuture<String>[] futures = new
CompletableFuture[spawners.size()];
- int index = 0;
- for (RuntimeSpawner spawner : spawners) {
- futures[index] =
spawner.getFunctionStatusAsJson(index);
- index++;
- }
- try {
- CompletableFuture.allOf(futures).get(5,
TimeUnit.SECONDS);
- for (index = 0; index < futures.length; ++index) {
- String json = futures[index].get();
- Gson gson = new
GsonBuilder().setPrettyPrinting().create();
- log.info(gson.toJson(new
JsonParser().parse(json)));
- }
- } catch (TimeoutException | InterruptedException |
ExecutionException e) {
- log.error("Could not get status from all local
instances");
- }
+ RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+ instanceConfig,
+ userCodeFile,
+ null,
+ runtimeFactory,
+ 30000);
+ spawners.add(runtimeSpawner);
+ runtimeSpawner.start();
+ }
+ Timer statusCheckTimer = new Timer();
+ statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ CompletableFuture<String>[] futures = new
CompletableFuture[spawners.size()];
+ int index = 0;
+ for (RuntimeSpawner spawner : spawners) {
+ futures[index] = spawner.getFunctionStatusAsJson(index);
+ index++;
}
- }, 30000, 30000);
- java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- statusCheckTimer.cancel();
+ try {
+ CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
+ for (index = 0; index < futures.length; ++index) {
+ String json = futures[index].get();
+ Gson gson = new
GsonBuilder().setPrettyPrinting().create();
+ log.info(gson.toJson(new JsonParser().parse(json)));
+ }
+ } catch (TimeoutException | InterruptedException |
ExecutionException e) {
+ log.error("Could not get status from all local instances");
}
- });
- }
+ }
+ }, 30000, 30000);
+ java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() ->
statusCheckTimer.cancel()));
}
@@ -574,13 +588,12 @@ public class LocalRunner implements AutoCloseable {
FunctionCollectorRegistry collectorRegistry =
FunctionCollectorRegistry.getDefaultImplementation();
RuntimeUtils.registerDefaultCollectors(collectorRegistry);
- ThreadRuntimeFactory threadRuntimeFactory;
ClassLoader originalClassLoader =
Thread.currentThread().getContextClassLoader();
try {
if (userCodeClassLoader != null) {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
}
- threadRuntimeFactory = new
ThreadRuntimeFactory("LocalRunnerThreadGroup",
+ runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
serviceUrl,
stateStorageServiceUrl,
authConfig,
@@ -614,16 +627,15 @@ public class LocalRunner implements AutoCloseable {
instanceConfig,
userCodeFile,
null,
- threadRuntimeFactory,
+ runtimeFactory,
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
-
if (metricsPortStart != null) {
// starting metrics server
log.info("Starting metrics server on port {}", metricsPortStart);
- new HTTPServer(new InetSocketAddress(metricsPortStart),
collectorRegistry, true);
+ metricsServer = new HTTPServer(new
InetSocketAddress(metricsPortStart), collectorRegistry, true);
}
}