This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 70e533a  [Transaction] Fix topicTransactionBuffer handle null snapshot 
(#12758) (#14510)
70e533a is described below

commit 70e533a5bd912c873cf2c9b36c6d22404580547e
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed Mar 2 08:02:22 2022 +0100

    [Transaction] Fix topicTransactionBuffer handle null snapshot (#12758) 
(#14510)
    
    fix https://github.com/apache/pulsar/issues/12754
    Now when delete topic, we will write a null value to Transaction buffer 
snapshot topic, other topic recover by this transaction buffer snapshot system 
topic, will produce NPE
    
    (cherry picked from commit c90c89b07ce544b92becedfaa7a0090b4b73edd2)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java | 14 ++++++++------
 .../pulsar/client/impl/TransactionEndToEndTest.java     | 17 +++++++++++++++++
 2 files changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 220b432..12d5ee4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -472,12 +472,14 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 try {
                     while (reader.hasMoreEvents()) {
                         Message<TransactionBufferSnapshot> message = 
reader.readNext();
-                        TransactionBufferSnapshot transactionBufferSnapshot = 
message.getValue();
-                        if 
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
-                            callBack.handleSnapshot(transactionBufferSnapshot);
-                            this.startReadCursorPosition = PositionImpl.get(
-                                    
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-                                    
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                        if (topic.getName().equals(message.getKey())) {
+                            TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
+                            if (transactionBufferSnapshot != null) {
+                                
callBack.handleSnapshot(transactionBufferSnapshot);
+                                this.startReadCursorPosition = 
PositionImpl.get(
+                                        
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                        
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                            }
                         }
                     }
                 } catch (PulsarClientException pulsarClientException) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index f971b5a..d5ae61c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -378,6 +378,23 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
     }
 
     @Test
+    public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
+        String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
+        String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
+        String sub = "test";
+        admin.topics().createNonPartitionedTopic(topicOne);
+        admin.topics().createSubscription(topicOne, "test", 
MessageId.earliest);
+        admin.topics().delete(topicOne);
+
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicTwo).subscriptionName(sub).subscribe();
+        String content = "test";
+        producer.send(content);
+        assertEquals(consumer.receive().getValue(), content);
+    }
+
+    @Test
     public void txnMessageAckTest() throws Exception {
         String topic = TOPIC_MESSAGE_ACK_TEST;
         final String subName = "test";

Reply via email to