This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 70e533a [Transaction] Fix topicTransactionBuffer handle null snapshot
(#12758) (#14510)
70e533a is described below
commit 70e533a5bd912c873cf2c9b36c6d22404580547e
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed Mar 2 08:02:22 2022 +0100
[Transaction] Fix topicTransactionBuffer handle null snapshot (#12758)
(#14510)
fix https://github.com/apache/pulsar/issues/12754
Now when delete topic, we will write a null value to Transaction buffer
snapshot topic, other topic recover by this transaction buffer snapshot system
topic, will produce NPE
(cherry picked from commit c90c89b07ce544b92becedfaa7a0090b4b73edd2)
---
.../transaction/buffer/impl/TopicTransactionBuffer.java | 14 ++++++++------
.../pulsar/client/impl/TransactionEndToEndTest.java | 17 +++++++++++++++++
2 files changed, 25 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 220b432..12d5ee4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -472,12 +472,14 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
try {
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message =
reader.readNext();
- TransactionBufferSnapshot transactionBufferSnapshot =
message.getValue();
- if
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
- callBack.handleSnapshot(transactionBufferSnapshot);
- this.startReadCursorPosition = PositionImpl.get(
-
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-
transactionBufferSnapshot.getMaxReadPositionEntryId());
+ if (topic.getName().equals(message.getKey())) {
+ TransactionBufferSnapshot
transactionBufferSnapshot = message.getValue();
+ if (transactionBufferSnapshot != null) {
+
callBack.handleSnapshot(transactionBufferSnapshot);
+ this.startReadCursorPosition =
PositionImpl.get(
+
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+
transactionBufferSnapshot.getMaxReadPositionEntryId());
+ }
}
}
} catch (PulsarClientException pulsarClientException) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index f971b5a..d5ae61c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -378,6 +378,23 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
@Test
+ public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
+ String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
+ String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
+ String sub = "test";
+ admin.topics().createNonPartitionedTopic(topicOne);
+ admin.topics().createSubscription(topicOne, "test",
MessageId.earliest);
+ admin.topics().delete(topicOne);
+
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicTwo).subscriptionName(sub).subscribe();
+ String content = "test";
+ producer.send(content);
+ assertEquals(consumer.receive().getValue(), content);
+ }
+
+ @Test
public void txnMessageAckTest() throws Exception {
String topic = TOPIC_MESSAGE_ACK_TEST;
final String subName = "test";