This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 810d707d8fcbcce8ce8e8e8b3a342114dff75ef5
Author: congbo <39078850+congbobo...@users.noreply.github.com>
AuthorDate: Tue May 10 16:03:45 2022 +0800

    [fix][txn] Topic transaction buffer recover don't close reader when throw 
RuntimeException (#15361)
    
    Fixes: https://github.com/apache/pulsar/issues/14878
    
    ### Motivation
    clear unuse reader in topicTransactionBufferSnapshot topic
    
    When reader decode the Snapshot will throw RuntimeException not 
PulsarClientException
    
    We should catch the Exception then close the reader and topic
    
    ```
    "java.util.concurrent.CompletionException: 
com.google.common.util.concurrent.UncheckedExecutionException: 
org.apache.commons.lang3.SerializationException: Failed at fetching schema info 
for 0
            at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
 ~[?:?]
            at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
 ~[?:?]
            at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704)
 ~[?:?]
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
 ~[?:?]
            at 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232)
 ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
 ~[?:?]
            at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
 ~[?:?]
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
 ~[?:?]
            at 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196)
 ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647)
 ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
 [?:?]
            at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
 [?:?]
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
[?:?]
            at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) 
[?:?]
            at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722)
 [?:?]
            at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
 [?:?]
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
            at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
            at java.lang.Thread.run(Thread.java:829) [?:?]
    Caused by: com.google.common.util.concurrent.UncheckedExecutionException: 
org.apache.commons.lang3.SerializationException: Failed at fetching schema info 
for 0
            at 
com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache.get(LocalCache.java:3951) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at 
com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at 
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) 
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) 
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583)
 ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
 ~[?:?]
            ... 8 more
    Caused by: org.apache.commons.lang3.SerializationException: Failed at 
fetching schema info for 0
            at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
 ~[com.google.guava-guava-30.1-jre.jar:?]
            at 
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at 
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
 ~[com.google.guava-guava-30.1-jre.jar:?]
            at 
com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache.get(LocalCache.java:3951) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at 
com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at 
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) 
~[com.google.guava-guava-30.1-jre.jar:?]
            at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67)
 ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) 
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) 
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583)
 ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
 ~[?:?]
            ... 8 more
    ```
    
    ### Modifications
    catch Exception then close the topic and reader
    
    (cherry picked from commit 0c58810d29838a161481f03c14990d0eb021a185)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java       |  8 ++++----
 .../broker/transaction/TopicTransactionBufferRecoverTest.java | 11 +++++++++--
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 3cbc3f14ea5..a432b76f5fa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -595,10 +595,10 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                                     callBack.noNeedToRecover();
                                     return;
                                 }
-                            } catch (PulsarClientException 
pulsarClientException) {
-                                log.error("[{}]Transaction buffer recover fail 
when read "
-                                        + "transactionBufferSnapshot!", 
topic.getName(), pulsarClientException);
-                                
callBack.recoverExceptionally(pulsarClientException);
+                            } catch (Exception ex) {
+                                log.error("[{}] Transaction buffer recover 
fail when read "
+                                        + "transactionBufferSnapshot!", 
topic.getName(), ex);
+                                callBack.recoverExceptionally(ex);
                                 closeReader(reader);
                                 return;
                             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index c3491738985..351fe124852 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -455,7 +455,7 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
 
 
     @Test(timeOut=30000)
-    public void testTransactionBufferRecoverThrowPulsarClientException() 
throws Exception {
+    public void testTransactionBufferRecoverThrowException() throws Exception {
         String topic = NAMESPACE1 + 
"/testTransactionBufferRecoverThrowPulsarClientException";
         @Cleanup
         Producer<byte[]> producer = pulsarClient
@@ -489,7 +489,14 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         field.setAccessible(true);
         TransactionBufferSnapshotService 
transactionBufferSnapshotServiceOriginal =
                 (TransactionBufferSnapshotService) 
field.get(getPulsarServiceList().get(0));
-        // mock reader can't read snapshot fail
+        // mock reader can't read snapshot fail throw RuntimeException
+        doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
+        // check reader close topic
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
+                transactionBufferSnapshotService, originalTopic, field, 
producer);
+        doReturn(true).when(reader).hasMoreEvents();
+
+        // mock reader can't read snapshot fail throw PulsarClientException
         doThrow(new 
PulsarClientException("test")).when(reader).hasMoreEvents();
         // check reader close topic
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,

Reply via email to