This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cd9356a  Allow Pulsar Functions localrun to exit on error (#12278)
cd9356a is described below

commit cd9356a03063653e0097f4b85d6b3546698cc325
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue Oct 5 22:20:55 2021 -0700

    Allow Pulsar Functions localrun to exit on error (#12278)
    
    Co-authored-by: Jerry Peng <[email protected]>
---
 .../worker/PulsarFunctionLocalRunTest.java         | 80 +++++++++++++++++++++-
 .../pulsar/functions/instance/JavaInstance.java    |  5 +-
 .../functions/instance/JavaInstanceRunnable.java   | 24 ++++---
 .../org/apache/pulsar/functions/LocalRunner.java   | 32 +++++++--
 .../pulsar/functions/runtime/RuntimeSpawner.java   |  2 +-
 5 files changed, 123 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index e9a303d..ed6c4aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
@@ -48,6 +49,8 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
+import lombok.Builder;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -394,6 +397,7 @@ public class PulsarFunctionLocalRunTest {
         sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, 
ConsumerConfig.builder().build()));
         sinkConfig.setSourceSubscriptionName(subName);
         sinkConfig.setCleanupSubscription(true);
+        sinkConfig.setConfigs(new HashMap<>());
         return sinkConfig;
     }
     /**
@@ -1039,9 +1043,83 @@ public class PulsarFunctionLocalRunTest {
     }
 
     @Test
-    public void testPulsarSinkStatsByteBufferType() throws Throwable{
+    public void testPulsarSinkStatsByteBufferType() throws Throwable {
         runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1, 
StatsNullSink.class.getName()));
     }
+    
+    public static class TestErrorSink implements Sink<byte[]> {
+        private Map config;
+        @Override
+        public void open(Map map, final SinkContext sinkContext) throws 
Exception {
+            config = map;
+            if (map.containsKey("throwErrorOpen")) {
+                throw new Exception("error on open");
+            }
+        }
+
+        @Override
+        public void write(Record<byte[]> record) throws Exception {
+            if (config.containsKey("throwErrorWrite")) {
+                throw new Exception("error on write");
+            }
+            record.ack();
+        }
+
+        @Override
+        public void close() throws Exception {
+            if (config.containsKey("throwErrorClose")) {
+                throw new Exception("error on close");
+            }
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testExitOnError() throws Throwable{
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/input";
+        final String sinkName = "PulsarSink-test";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("local"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+        SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, 
sinkName, sourceTopic, subscriptionName);
+
+        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, 
ConsumerConfig.builder().receiverQueueSize(1000).build()));
+
+        sinkConfig.setClassName(TestErrorSink.class.getName());
+
+        int metricsPort = FunctionCommon.findAvailablePort();
+
+        LocalRunner.LocalRunnerBuilder localRunnerBuilder = 
LocalRunner.builder()
+                .clientAuthPlugin(AuthenticationTls.class.getName())
+                
.clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", 
TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+                .useTls(true)
+                .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .tlsAllowInsecureConnection(true)
+                .tlsHostNameVerificationEnabled(false)
+                .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
+                .connectorsDirectory(workerConfig.getConnectorsDirectory())
+                .metricsPortStart(metricsPort)
+                .exitOnError(true);
+
+        sinkConfig.getConfigs().put("throwErrorOpen", true);
+        localRunnerBuilder.sinkConfig(sinkConfig);
+        LocalRunner localRunner = localRunnerBuilder.build();
+        localRunner.start(true);
+
+        sinkConfig.getConfigs().put("throwErrorWrite", true);
+        localRunnerBuilder.sinkConfig(sinkConfig);
+        localRunner = localRunnerBuilder.build();
+        localRunner.start(true);
+    }
 
     private void runWithNarClassLoader(Assert.ThrowingRunnable 
throwingRunnable) throws Throwable {
         ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 83b64dc..33feb25 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -80,7 +80,7 @@ public class JavaInstance implements AutoCloseable {
     }
 
     public JavaExecutionResult handleMessage(Record<?> record, Object input,
-                                             BiConsumer<Record, 
JavaExecutionResult> asyncResultConsumer,
+                                             
JavaInstanceRunnable.AsyncResultConsumer asyncResultConsumer,
                                              Consumer<Throwable> 
asyncFailureHandler) {
         if (context != null) {
             context.setCurrentMessageContext(record);
@@ -131,8 +131,7 @@ public class JavaInstance implements AutoCloseable {
         }
     }
 
-    private void processAsyncResults(BiConsumer<Record, JavaExecutionResult> 
resultConsumer)
-        throws InterruptedException {
+    private void processAsyncResults(JavaInstanceRunnable.AsyncResultConsumer 
resultConsumer) throws Exception {
         AsyncFuncRequest asyncResult = pendingAsyncRequests.peek();
         while (asyncResult != null && asyncResult.getProcessResult().isDone()) 
{
             pendingAsyncRequests.remove(asyncResult);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 6aff6f4..cfdcb08 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -28,9 +28,12 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
@@ -240,6 +243,10 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 pulsarAdmin, clientBuilder);
     }
 
+    public interface AsyncResultConsumer  {
+        void accept(Record record, JavaExecutionResult javaExecutionResult) 
throws Exception;
+    }
+
     /**
      * The core logic that initialize the instance thread and executes the 
function.
      */
@@ -249,6 +256,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             setup();
 
             Thread currentThread = Thread.currentThread();
+            Consumer<Throwable> asyncErrorHandler = throwable -> 
currentThread.interrupt();
+            AsyncResultConsumer asyncResultConsumer = (record, 
javaExecutionResult) -> handleResult(record, javaExecutionResult);
 
             while (true) {
                 currentRecord = readInput();
@@ -275,8 +284,10 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 // process the message
                 
Thread.currentThread().setContextClassLoader(functionClassLoader);
                 result = javaInstance.handleMessage(
-                    currentRecord, currentRecord.getValue(), 
this::handleResult,
-                    cause -> currentThread.interrupt());
+                        currentRecord,
+                        currentRecord.getValue(),
+                        asyncResultConsumer,
+                        asyncErrorHandler);
                 
Thread.currentThread().setContextClassLoader(instanceClassLoader);
 
                 // register end time
@@ -328,11 +339,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         }
     }
 
-    private void processAsyncResults() throws InterruptedException {
-
-    }
-
-    private void handleResult(Record srcRecord, JavaExecutionResult result) {
+    private void handleResult(Record srcRecord, JavaExecutionResult result) 
throws Exception {
         if (result.getUserException() != null) {
             Exception t = result.getUserException();
             log.warn("Encountered exception when processing message {}",
@@ -353,7 +360,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         }
     }
 
-    private void sendOutputMessage(Record srcRecord, Object output) {
+    private void sendOutputMessage(Record srcRecord, Object output) throws 
Exception {
         if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
             Thread.currentThread().setContextClassLoader(functionClassLoader);
         }
@@ -364,6 +371,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             stats.incrSinkExceptions(e);
             // fail the source record
             srcRecord.fail();
+            throw e;
         } finally {
             Thread.currentThread().setContextClassLoader(instanceClassLoader);
         }
diff --git 
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
 
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 651523e..0bedb74 100644
--- 
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ 
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -94,6 +94,7 @@ public class LocalRunner implements AutoCloseable {
     private final File narExtractionDirectoryCreated;
     private final String connectorsDir;
     private final Thread shutdownHook;
+    private final int instanceLivenessCheck;
     private ClassLoader userCodeClassLoader;
     private boolean userCodeClassLoaderCreated;
     private RuntimeFactory runtimeFactory;
@@ -178,6 +179,8 @@ public class LocalRunner implements AutoCloseable {
     protected String secretsProviderConfig;
     @Parameter(names = "--metricsPortStart", description = "The starting port 
range for metrics server. When running instances as threads, one metrics server 
is used to host the stats for all instances.", hidden = true)
     protected Integer metricsPortStart;
+    @Parameter(names = "--exitOnError", description = "The starting port range 
for metrics server. When running instances as threads, one metrics server is 
used to host the stats for all instances.", hidden = true)
+    protected boolean exitOnError;
 
     private static final String DEFAULT_SERVICE_URL = 
"pulsar://localhost:6650";
     private static final String DEFAULT_WEB_SERVICE_URL = 
"http://localhost:8080";;
@@ -203,7 +206,7 @@ public class LocalRunner implements AutoCloseable {
                        boolean useTls, boolean tlsAllowInsecureConnection, 
boolean tlsHostNameVerificationEnabled,
                        String tlsTrustCertFilePath, int instanceIdOffset, 
RuntimeEnv runtimeEnv,
                        String secretsProviderClassName, String 
secretsProviderConfig, String narExtractionDirectory,
-                       String connectorsDirectory, Integer metricsPortStart) {
+                       String connectorsDirectory, Integer metricsPortStart, 
boolean exitOnError) {
         this.functionConfig = functionConfig;
         this.sourceConfig = sourceConfig;
         this.sinkConfig = sinkConfig;
@@ -236,6 +239,8 @@ public class LocalRunner implements AutoCloseable {
             this.connectorsDir = Paths.get(pulsarHome, 
"connectors").toString();
         }
         this.metricsPortStart = metricsPortStart;
+        this.exitOnError = exitOnError;
+        this.instanceLivenessCheck = exitOnError ? 0 : 30000;
         shutdownHook = new Thread(() -> {
             try {
                 LocalRunner.this.close();
@@ -259,13 +264,16 @@ public class LocalRunner implements AutoCloseable {
             stop();
         } finally {
             if (narExtractionDirectoryCreated != null) {
-                FileUtils.deleteFile(narExtractionDirectoryCreated, true);
+                if (narExtractionDirectoryCreated.exists()) {
+                    FileUtils.deleteFile(narExtractionDirectoryCreated, true);
+                }
             }
         }
     }
 
     public synchronized void stop() {
         if (running.compareAndSet(true, false)) {
+            this.notify();
             try {
                 Runtime.getRuntime().removeShutdownHook(shutdownHook);
             } catch (IllegalStateException e) {
@@ -472,9 +480,18 @@ public class LocalRunner implements AutoCloseable {
         }
 
         if (blocking) {
-            for (RuntimeSpawner spawner : local) {
-                spawner.join();
-                log.info("RuntimeSpawner quit because of", 
spawner.getRuntime().getDeathException());
+            if (exitOnError) {
+                for (RuntimeSpawner spawner : local) {
+                    spawner.join();
+                    log.info("RuntimeSpawner quit because of", 
spawner.getRuntime().getDeathException());
+                }
+                close();
+            } else  {
+                synchronized (this) {
+                    while (running.get()) {
+                        this.wait();
+                    }
+                }
             }
         }
     }
@@ -523,12 +540,13 @@ public class LocalRunner implements AutoCloseable {
                     
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
                 }
             }
+
             RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                     instanceConfig,
                     userCodeFile,
                     null,
                     runtimeFactory,
-                    30000);
+                    instanceLivenessCheck);
             spawners.add(runtimeSpawner);
             runtimeSpawner.start();
         }
@@ -629,7 +647,7 @@ public class LocalRunner implements AutoCloseable {
                     userCodeFile,
                     null,
                     runtimeFactory,
-                    30000);
+                    instanceLivenessCheck);
             spawners.add(runtimeSpawner);
             runtimeSpawner.start();
         }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 2a4f95f..471af68 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -84,7 +84,7 @@ public class RuntimeSpawner implements AutoCloseable {
             processLivenessCheckTimer = 
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
 -> {
                 Runtime runtime = RuntimeSpawner.this.runtime;
                 if (runtime != null && !runtime.isAlive()) {
-                    log.error("{}/{}/{}-{} Function Container is dead with 
exception.. restarting", details.getTenant(),
+                    log.error("{}/{}/{} Function Container is dead with 
following exception. Restarting.", details.getTenant(),
                             details.getNamespace(), details.getName(), 
runtime.getDeathException());
                     // Just for the sake of sanity, just destroy the runtime
                     try {

Reply via email to