This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ded9086 Consolidate timer threads in functions (#3109)
ded9086 is described below
commit ded9086237ef6f8cf5d17676a2056bb7ff2daad3
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon Dec 3 17:47:13 2018 -0800
Consolidate timer threads in functions (#3109)
* Use instanceCache schedulerExecutorService for timer
* fixing process runtime
* addressing comments
---
.../functions/instance/FunctionStatsManager.java | 5 ++-
.../pulsar/functions/instance/InstanceCache.java | 13 +++++--
.../functions/instance/JavaInstanceRunnable.java | 2 +-
.../pulsar/functions/runtime/JavaInstanceMain.java | 28 +++++++--------
.../pulsar/functions/runtime/ProcessRuntime.java | 27 +++++++-------
.../pulsar/functions/runtime/RuntimeSpawner.java | 41 +++++++++++-----------
6 files changed, 61 insertions(+), 55 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index f9a3c77..9059f79 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -393,6 +393,9 @@ public class FunctionStatsManager implements AutoCloseable {
@Override
public void close() {
- scheduledFuture.cancel(false);
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduledFuture = null;
+ }
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 937c273..fe7e049 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -18,17 +18,24 @@
*/
package org.apache.pulsar.functions.instance;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Getter;
+
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
public class InstanceCache {
private static InstanceCache instance;
- public final ScheduledExecutorService executor;
+ @Getter
+ private final ScheduledExecutorService scheduledExecutorService;
private InstanceCache() {
- executor = Executors.newSingleThreadScheduledExecutor();
+ ThreadFactory namedThreadFactory =
+ new DefaultThreadFactory("function-timer-thread");
+ scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(namedThreadFactory);
}
public static InstanceCache getInstanceCache() {
@@ -43,7 +50,7 @@ public class InstanceCache {
public static void shutdown() {
synchronized (InstanceCache.class) {
if (instance != null) {
- instance.executor.shutdown();
+ instance.scheduledExecutorService.shutdown();
}
instance = null;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d302e6a..e55fabc 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -216,7 +216,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
if (this.collectorRegistry == null) {
this.collectorRegistry = new CollectorRegistry();
}
- this.stats = new FunctionStatsManager(this.collectorRegistry,
this.metricsLabels, this.instanceCache.executor);
+ this.stats = new FunctionStatsManager(this.collectorRegistry,
this.metricsLabels, this.instanceCache.getScheduledExecutorService());
ContextImpl contextImpl = setupContext();
javaInstance = setupJavaInstance(contextImpl);
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index f0be511..3a0a404 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -34,6 +34,7 @@ import io.prometheus.client.exporter.HTTPServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -49,6 +50,7 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -122,8 +124,8 @@ public class JavaInstanceMain implements AutoCloseable {
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
private Long lastHealthCheckTs = null;
- private ScheduledExecutorService timer;
private HTTPServer metricsServer;
+ private ScheduledFuture healthCheckTimer;
public JavaInstanceMain() { }
@@ -215,18 +217,14 @@ public class JavaInstanceMain implements AutoCloseable {
metricsServer = new HTTPServer(new InetSocketAddress(metrics_port),
collectorRegistry, true);
if (expectedHealthCheckInterval > 0) {
- timer = Executors.newSingleThreadScheduledExecutor();
- timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- if (System.currentTimeMillis() - lastHealthCheckTs > 3
* expectedHealthCheckInterval * 1000) {
- log.info("Haven't received health check from
spawner in a while. Stopping instance...");
- close();
- }
- } catch (Exception e) {
- log.error("Error occurred when checking for latest
health check", e);
+ healthCheckTimer =
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
-> {
+ try {
+ if (System.currentTimeMillis() - lastHealthCheckTs > 3 *
expectedHealthCheckInterval * 1000) {
+ log.info("Haven't received health check from spawner
in a while. Stopping instance...");
+ close();
}
+ } catch (Exception e) {
+ log.error("Error occurred when checking for latest health
check", e);
}
}, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval
* 1000, TimeUnit.MILLISECONDS);
}
@@ -260,8 +258,8 @@ public class JavaInstanceMain implements AutoCloseable {
if (runtimeSpawner != null) {
runtimeSpawner.close();
}
- if (timer != null) {
- timer.shutdown();
+ if (healthCheckTimer != null) {
+ healthCheckTimer.cancel(false);
}
if (containerFactory != null) {
containerFactory.close();
@@ -269,6 +267,8 @@ public class JavaInstanceMain implements AutoCloseable {
if (metricsServer != null) {
metricsServer.stop();
}
+
+ InstanceCache.shutdown();
} catch (Exception ex) {
System.err.println(ex);
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 4edac17..70e7a3a 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -30,6 +30,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -45,6 +46,7 @@ import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -64,7 +66,7 @@ class ProcessRuntime implements Runtime {
private Throwable deathException;
private ManagedChannel channel;
private InstanceControlGrpc.InstanceControlFutureStub stub;
- private ScheduledExecutorService timer;
+ private ScheduledFuture timer;
private InstanceConfig instanceConfig;
private final Long expectedHealthCheckInterval;
private final SecretsProviderConfigurator secretsProviderConfigurator;
@@ -138,19 +140,14 @@ class ProcessRuntime implements Runtime {
.build();
stub = InstanceControlGrpc.newFutureStub(channel);
- timer = Executors.newSingleThreadScheduledExecutor();
- timer.scheduleAtFixedRate(new TimerTask() {
-
- @Override
- public void run() {
- CompletableFuture<InstanceCommunication.HealthCheckResult>
result = healthCheck();
- try {
- result.get();
- } catch (Exception e) {
- log.error("Health check failed for {}-{}",
- instanceConfig.getFunctionDetails().getName(),
- instanceConfig.getInstanceId(), e);
- }
+ timer =
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
-> {
+ CompletableFuture<InstanceCommunication.HealthCheckResult>
result = healthCheck();
+ try {
+ result.get();
+ } catch (Exception e) {
+ log.error("Health check failed for {}-{}",
+ instanceConfig.getFunctionDetails().getName(),
+ instanceConfig.getInstanceId(), e);
}
}, expectedHealthCheckInterval, expectedHealthCheckInterval,
TimeUnit.SECONDS);
}
@@ -164,7 +161,7 @@ class ProcessRuntime implements Runtime {
@Override
public void stop() {
if (timer != null) {
- timer.shutdown();
+ timer.cancel(false);
}
if (process != null) {
process.destroyForcibly();
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 6b5abce..aa1784a 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -27,12 +27,15 @@ import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -50,7 +53,7 @@ public class RuntimeSpawner implements AutoCloseable {
@Getter
private Runtime runtime;
- private Timer processLivenessCheckTimer;
+ private ScheduledFuture processLivenessCheckTimer;
private int numRestarts;
private long instanceLivenessCheckFreqMs;
private Throwable runtimeDeathException;
@@ -79,27 +82,23 @@ public class RuntimeSpawner implements AutoCloseable {
// monitor function runtime to make sure it is running. If not,
restart the function runtime
if (!runtimeFactory.externallyManaged() && instanceLivenessCheckFreqMs
> 0) {
- processLivenessCheckTimer = new Timer();
- processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- Runtime runtime = RuntimeSpawner.this.runtime;
- if (runtime != null && !runtime.isAlive()) {
- log.error("{}/{}/{}-{} Function Container is dead with
exception.. restarting", details.getTenant(),
- details.getNamespace(), details.getName(),
runtime.getDeathException());
- // Just for the sake of sanity, just destroy the
runtime
- try {
- runtime.stop();
- runtimeDeathException =
runtime.getDeathException();
- runtime.start();
- } catch (Exception e) {
- log.error("{}/{}/{}-{} Function Restart failed",
details.getTenant(),
- details.getNamespace(), details.getName(),
e, e);
- }
- numRestarts++;
+ processLivenessCheckTimer =
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
-> {
+ Runtime runtime = RuntimeSpawner.this.runtime;
+ if (runtime != null && !runtime.isAlive()) {
+ log.error("{}/{}/{}-{} Function Container is dead with
exception.. restarting", details.getTenant(),
+ details.getNamespace(), details.getName(),
runtime.getDeathException());
+ // Just for the sake of sanity, just destroy the runtime
+ try {
+ runtime.stop();
+ runtimeDeathException = runtime.getDeathException();
+ runtime.start();
+ } catch (Exception e) {
+ log.error("{}/{}/{}-{} Function Restart failed",
details.getTenant(),
+ details.getNamespace(), details.getName(), e,
e);
}
+ numRestarts++;
}
- }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs);
+ }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs,
TimeUnit.MILLISECONDS);
}
}
@@ -139,7 +138,7 @@ public class RuntimeSpawner implements AutoCloseable {
public void close() {
// cancel liveness checker before stopping runtime.
if (processLivenessCheckTimer != null) {
- processLivenessCheckTimer.cancel();
+ processLivenessCheckTimer.cancel(false);
processLivenessCheckTimer = null;
}
if (null != runtime) {