This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 73595c9  Closing FunctionAssignmentTailer should handle 
ConsumerAlreadyClosedException (#3747)
73595c9 is described below

commit 73595c944328b16760afd2182503862d0655f58c
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Apr 9 23:07:53 2019 +0800

    Closing FunctionAssignmentTailer should handle 
ConsumerAlreadyClosedException (#3747)
    
    *Motivation*
    
    Currently when FunctionAssignmentTailer throws RuntimeException when 
received
    exception when tailing the assignment topic. Exception can be thrown when 
closing
    the reader.
    
    The runtime exception is the root cause why PulsarWorkerAssignmentTest 
timeouts
    during shutting down.
    
    *Modifications*
    
    Handle AlreadyClosedException differently
---
 .../org/apache/pulsar/common/util/FutureUtil.java  |  9 ++++++
 .../functions/worker/FunctionAssignmentTailer.java | 37 ++++++++++++++++------
 2 files changed, 37 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 0aa6a96..e83167a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 public class FutureUtil {
 
@@ -38,4 +39,12 @@ public class FutureUtil {
         future.completeExceptionally(t);
         return future;
     }
+
+    public static Throwable unwrapCompletionException(Throwable t) {
+        if (t instanceof CompletionException) {
+            return unwrapCompletionException(t.getCause());
+        } else {
+            return t;
+        }
+    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index b40c6cd..bac3128 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -22,8 +22,9 @@ import java.io.IOException;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import lombok.extern.slf4j.Slf4j;
@@ -32,13 +33,12 @@ import lombok.extern.slf4j.Slf4j;
 public class FunctionAssignmentTailer
     implements java.util.function.Consumer<Message<byte[]>>, 
Function<Throwable, Void>, AutoCloseable {
 
-        private final FunctionRuntimeManager functionRuntimeManager;
-        private final Reader<byte[]> reader;
+    private final FunctionRuntimeManager functionRuntimeManager;
+    private final Reader<byte[]> reader;
+    private boolean closed = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager, Reader<byte[]> reader)
-            throws PulsarClientException {
+    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager, Reader<byte[]> reader) {
         this.functionRuntimeManager = functionRuntimeManager;
-
         this.reader = reader;
     }
 
@@ -54,8 +54,12 @@ public class FunctionAssignmentTailer
 
     @Override
     public void close() {
+        if (closed) {
+            return;
+        }
         log.info("Stopping function state consumer");
         try {
+            closed = true;
             reader.close();
         } catch (IOException e) {
             log.error("Failed to stop function state consumer", e);
@@ -91,8 +95,23 @@ public class FunctionAssignmentTailer
 
     @Override
     public Void apply(Throwable cause) {
-        log.error("Failed to retrieve messages from assignment update topic", 
cause);
-        // TODO: find a better way to handle consumer functions
-        throw new RuntimeException(cause);
+        Throwable realCause = FutureUtil.unwrapCompletionException(cause);
+        if (realCause instanceof AlreadyClosedException) {
+            // if reader is closed because tailer is closed, ignore the 
exception
+            if (closed) {
+                // ignore
+                return null;
+            } else {
+                log.error("Reader of assignment update topic is closed 
unexpectedly", cause);
+                throw new RuntimeException(
+                    "Reader of assignment update topic is closed unexpectedly",
+                    cause
+                );
+            }
+        } else {
+            log.error("Failed to retrieve messages from assignment update 
topic", cause);
+            // TODO: find a better way to handle consumer functions
+            throw new RuntimeException(cause);
+        }
     }
 }

Reply via email to