This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cd9356a Allow Pulsar Functions localrun to exit on error (#12278)
cd9356a is described below
commit cd9356a03063653e0097f4b85d6b3546698cc325
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue Oct 5 22:20:55 2021 -0700
Allow Pulsar Functions localrun to exit on error (#12278)
Co-authored-by: Jerry Peng <[email protected]>
---
.../worker/PulsarFunctionLocalRunTest.java | 80 +++++++++++++++++++++-
.../pulsar/functions/instance/JavaInstance.java | 5 +-
.../functions/instance/JavaInstanceRunnable.java | 24 ++++---
.../org/apache/pulsar/functions/LocalRunner.java | 32 +++++++--
.../pulsar/functions/runtime/RuntimeSpawner.java | 2 +-
5 files changed, 123 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index e9a303d..ed6c4aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
@@ -48,6 +49,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
+import lombok.Builder;
import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -394,6 +397,7 @@ public class PulsarFunctionLocalRunTest {
sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic,
ConsumerConfig.builder().build()));
sinkConfig.setSourceSubscriptionName(subName);
sinkConfig.setCleanupSubscription(true);
+ sinkConfig.setConfigs(new HashMap<>());
return sinkConfig;
}
/**
@@ -1039,9 +1043,83 @@ public class PulsarFunctionLocalRunTest {
}
@Test
- public void testPulsarSinkStatsByteBufferType() throws Throwable{
+ public void testPulsarSinkStatsByteBufferType() throws Throwable {
runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1,
StatsNullSink.class.getName()));
}
+
+ public static class TestErrorSink implements Sink<byte[]> {
+ private Map config;
+ @Override
+ public void open(Map map, final SinkContext sinkContext) throws
Exception {
+ config = map;
+ if (map.containsKey("throwErrorOpen")) {
+ throw new Exception("error on open");
+ }
+ }
+
+ @Override
+ public void write(Record<byte[]> record) throws Exception {
+ if (config.containsKey("throwErrorWrite")) {
+ throw new Exception("error on write");
+ }
+ record.ack();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (config.containsKey("throwErrorClose")) {
+ throw new Exception("error on close");
+ }
+ }
+ }
+
+ @Test(timeOut = 20000)
+ public void testExitOnError() throws Throwable{
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/input";
+ final String sinkName = "PulsarSink-test";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("local"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+ SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion,
sinkName, sourceTopic, subscriptionName);
+
+ sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic,
ConsumerConfig.builder().receiverQueueSize(1000).build()));
+
+ sinkConfig.setClassName(TestErrorSink.class.getName());
+
+ int metricsPort = FunctionCommon.findAvailablePort();
+
+ LocalRunner.LocalRunnerBuilder localRunnerBuilder =
LocalRunner.builder()
+ .clientAuthPlugin(AuthenticationTls.class.getName())
+
.clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s",
TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+ .useTls(true)
+ .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+ .tlsAllowInsecureConnection(true)
+ .tlsHostNameVerificationEnabled(false)
+ .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
+ .connectorsDirectory(workerConfig.getConnectorsDirectory())
+ .metricsPortStart(metricsPort)
+ .exitOnError(true);
+
+ sinkConfig.getConfigs().put("throwErrorOpen", true);
+ localRunnerBuilder.sinkConfig(sinkConfig);
+ LocalRunner localRunner = localRunnerBuilder.build();
+ localRunner.start(true);
+
+ sinkConfig.getConfigs().put("throwErrorWrite", true);
+ localRunnerBuilder.sinkConfig(sinkConfig);
+ localRunner = localRunnerBuilder.build();
+ localRunner.start(true);
+ }
private void runWithNarClassLoader(Assert.ThrowingRunnable
throwingRunnable) throws Throwable {
ClassLoader originalClassLoader =
Thread.currentThread().getContextClassLoader();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 83b64dc..33feb25 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -80,7 +80,7 @@ public class JavaInstance implements AutoCloseable {
}
public JavaExecutionResult handleMessage(Record<?> record, Object input,
- BiConsumer<Record,
JavaExecutionResult> asyncResultConsumer,
+
JavaInstanceRunnable.AsyncResultConsumer asyncResultConsumer,
Consumer<Throwable>
asyncFailureHandler) {
if (context != null) {
context.setCurrentMessageContext(record);
@@ -131,8 +131,7 @@ public class JavaInstance implements AutoCloseable {
}
}
- private void processAsyncResults(BiConsumer<Record, JavaExecutionResult>
resultConsumer)
- throws InterruptedException {
+ private void processAsyncResults(JavaInstanceRunnable.AsyncResultConsumer
resultConsumer) throws Exception {
AsyncFuncRequest asyncResult = pendingAsyncRequests.peek();
while (asyncResult != null && asyncResult.getProcessResult().isDone())
{
pendingAsyncRequests.remove(asyncResult);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 6aff6f4..cfdcb08 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -28,9 +28,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
@@ -240,6 +243,10 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
pulsarAdmin, clientBuilder);
}
+ public interface AsyncResultConsumer {
+ void accept(Record record, JavaExecutionResult javaExecutionResult)
throws Exception;
+ }
+
/**
* The core logic that initialize the instance thread and executes the
function.
*/
@@ -249,6 +256,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
setup();
Thread currentThread = Thread.currentThread();
+ Consumer<Throwable> asyncErrorHandler = throwable ->
currentThread.interrupt();
+ AsyncResultConsumer asyncResultConsumer = (record,
javaExecutionResult) -> handleResult(record, javaExecutionResult);
while (true) {
currentRecord = readInput();
@@ -275,8 +284,10 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
result = javaInstance.handleMessage(
- currentRecord, currentRecord.getValue(),
this::handleResult,
- cause -> currentThread.interrupt());
+ currentRecord,
+ currentRecord.getValue(),
+ asyncResultConsumer,
+ asyncErrorHandler);
Thread.currentThread().setContextClassLoader(instanceClassLoader);
// register end time
@@ -328,11 +339,7 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
}
}
- private void processAsyncResults() throws InterruptedException {
-
- }
-
- private void handleResult(Record srcRecord, JavaExecutionResult result) {
+ private void handleResult(Record srcRecord, JavaExecutionResult result)
throws Exception {
if (result.getUserException() != null) {
Exception t = result.getUserException();
log.warn("Encountered exception when processing message {}",
@@ -353,7 +360,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
}
}
- private void sendOutputMessage(Record srcRecord, Object output) {
+ private void sendOutputMessage(Record srcRecord, Object output) throws
Exception {
if (componentType ==
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
@@ -364,6 +371,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
stats.incrSinkExceptions(e);
// fail the source record
srcRecord.fail();
+ throw e;
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
diff --git
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 651523e..0bedb74 100644
---
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -94,6 +94,7 @@ public class LocalRunner implements AutoCloseable {
private final File narExtractionDirectoryCreated;
private final String connectorsDir;
private final Thread shutdownHook;
+ private final int instanceLivenessCheck;
private ClassLoader userCodeClassLoader;
private boolean userCodeClassLoaderCreated;
private RuntimeFactory runtimeFactory;
@@ -178,6 +179,8 @@ public class LocalRunner implements AutoCloseable {
protected String secretsProviderConfig;
@Parameter(names = "--metricsPortStart", description = "The starting port
range for metrics server. When running instances as threads, one metrics server
is used to host the stats for all instances.", hidden = true)
protected Integer metricsPortStart;
+ @Parameter(names = "--exitOnError", description = "The starting port range
for metrics server. When running instances as threads, one metrics server is
used to host the stats for all instances.", hidden = true)
+ protected boolean exitOnError;
private static final String DEFAULT_SERVICE_URL =
"pulsar://localhost:6650";
private static final String DEFAULT_WEB_SERVICE_URL =
"http://localhost:8080";
@@ -203,7 +206,7 @@ public class LocalRunner implements AutoCloseable {
boolean useTls, boolean tlsAllowInsecureConnection,
boolean tlsHostNameVerificationEnabled,
String tlsTrustCertFilePath, int instanceIdOffset,
RuntimeEnv runtimeEnv,
String secretsProviderClassName, String
secretsProviderConfig, String narExtractionDirectory,
- String connectorsDirectory, Integer metricsPortStart) {
+ String connectorsDirectory, Integer metricsPortStart,
boolean exitOnError) {
this.functionConfig = functionConfig;
this.sourceConfig = sourceConfig;
this.sinkConfig = sinkConfig;
@@ -236,6 +239,8 @@ public class LocalRunner implements AutoCloseable {
this.connectorsDir = Paths.get(pulsarHome,
"connectors").toString();
}
this.metricsPortStart = metricsPortStart;
+ this.exitOnError = exitOnError;
+ this.instanceLivenessCheck = exitOnError ? 0 : 30000;
shutdownHook = new Thread(() -> {
try {
LocalRunner.this.close();
@@ -259,13 +264,16 @@ public class LocalRunner implements AutoCloseable {
stop();
} finally {
if (narExtractionDirectoryCreated != null) {
- FileUtils.deleteFile(narExtractionDirectoryCreated, true);
+ if (narExtractionDirectoryCreated.exists()) {
+ FileUtils.deleteFile(narExtractionDirectoryCreated, true);
+ }
}
}
}
public synchronized void stop() {
if (running.compareAndSet(true, false)) {
+ this.notify();
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (IllegalStateException e) {
@@ -472,9 +480,18 @@ public class LocalRunner implements AutoCloseable {
}
if (blocking) {
- for (RuntimeSpawner spawner : local) {
- spawner.join();
- log.info("RuntimeSpawner quit because of",
spawner.getRuntime().getDeathException());
+ if (exitOnError) {
+ for (RuntimeSpawner spawner : local) {
+ spawner.join();
+ log.info("RuntimeSpawner quit because of",
spawner.getRuntime().getDeathException());
+ }
+ close();
+ } else {
+ synchronized (this) {
+ while (running.get()) {
+ this.wait();
+ }
+ }
}
}
}
@@ -523,12 +540,13 @@ public class LocalRunner implements AutoCloseable {
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
}
+
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
null,
runtimeFactory,
- 30000);
+ instanceLivenessCheck);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
@@ -629,7 +647,7 @@ public class LocalRunner implements AutoCloseable {
userCodeFile,
null,
runtimeFactory,
- 30000);
+ instanceLivenessCheck);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 2a4f95f..471af68 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -84,7 +84,7 @@ public class RuntimeSpawner implements AutoCloseable {
processLivenessCheckTimer =
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
-> {
Runtime runtime = RuntimeSpawner.this.runtime;
if (runtime != null && !runtime.isAlive()) {
- log.error("{}/{}/{}-{} Function Container is dead with
exception.. restarting", details.getTenant(),
+ log.error("{}/{}/{} Function Container is dead with
following exception. Restarting.", details.getTenant(),
details.getNamespace(), details.getName(),
runtime.getDeathException());
// Just for the sake of sanity, just destroy the runtime
try {