This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 6fd31d101ba [fix][txn] Topic transaction buffer recover don't close
reader when throw RuntimeException (#15361)
6fd31d101ba is described below
commit 6fd31d101bad252b512925ba733d8c636aa6307d
Author: congbo <[email protected]>
AuthorDate: Tue May 10 16:03:45 2022 +0800
[fix][txn] Topic transaction buffer recover don't close reader when throw
RuntimeException (#15361)
Fixes: https://github.com/apache/pulsar/issues/14878
### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic
When reader decode the Snapshot will throw RuntimeException not
PulsarClientException
We should catch the Exception then close the reader and topic
```
"java.util.concurrent.CompletionException:
com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.commons.lang3.SerializationException: Failed at fetching schema info
for 0
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
~[?:?]
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704)
~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
~[?:?]
at
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232)
~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
~[?:?]
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
~[?:?]
at
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196)
~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647)
~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
[?:?]
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
[?:?]
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
[?:?]
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722)
[?:?]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
[?:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.commons.lang3.SerializationException: Failed at fetching schema info
for 0
at
com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
~[com.google.guava-guava-30.1-jre.jar:?]
at
com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
~[com.google.guava-guava-30.1-jre.jar:?]
at
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
~[com.google.guava-guava-30.1-jre.jar:?]
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583)
~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
~[?:?]
... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at
fetching schema info for 0
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
~[com.google.guava-guava-30.1-jre.jar:?]
at
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
~[com.google.guava-guava-30.1-jre.jar:?]
at
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
~[com.google.guava-guava-30.1-jre.jar:?]
at
com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
~[com.google.guava-guava-30.1-jre.jar:?]
at
com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
~[com.google.guava-guava-30.1-jre.jar:?]
at
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
~[com.google.guava-guava-30.1-jre.jar:?]
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462)
~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583)
~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
~[?:?]
... 8 more
```
### Modifications
catch Exception then close the topic and reader
(cherry picked from commit 0c58810d29838a161481f03c14990d0eb021a185)
---
.../transaction/buffer/impl/TopicTransactionBuffer.java | 8 ++++----
.../broker/transaction/TopicTransactionBufferRecoverTest.java | 11 +++++++++--
2 files changed, 13 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a348ccbb764..c889e006978 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -595,10 +595,10 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
callBack.noNeedToRecover();
return;
}
- } catch (PulsarClientException
pulsarClientException) {
- log.error("[{}]Transaction buffer recover fail
when read "
- + "transactionBufferSnapshot!",
topic.getName(), pulsarClientException);
-
callBack.recoverExceptionally(pulsarClientException);
+ } catch (Exception ex) {
+ log.error("[{}] Transaction buffer recover
fail when read "
+ + "transactionBufferSnapshot!",
topic.getName(), ex);
+ callBack.recoverExceptionally(ex);
closeReader(reader);
return;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index fe724dd2be7..dddda0f962d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -457,7 +457,7 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
@Test(timeOut=30000)
- public void testTransactionBufferRecoverThrowPulsarClientException()
throws Exception {
+ public void testTransactionBufferRecoverThrowException() throws Exception {
String topic = NAMESPACE1 +
"/testTransactionBufferRecoverThrowPulsarClientException";
@Cleanup
Producer<byte[]> producer = pulsarClient
@@ -491,7 +491,14 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
field.setAccessible(true);
TransactionBufferSnapshotService
transactionBufferSnapshotServiceOriginal =
(TransactionBufferSnapshotService)
field.get(getPulsarServiceList().get(0));
- // mock reader can't read snapshot fail
+ // mock reader can't read snapshot fail throw RuntimeException
+ doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
+ // check reader close topic
+ checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
+ transactionBufferSnapshotService, originalTopic, field,
producer);
+ doReturn(true).when(reader).hasMoreEvents();
+
+ // mock reader can't read snapshot fail throw PulsarClientException
doThrow(new
PulsarClientException("test")).when(reader).hasMoreEvents();
// check reader close topic
checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,