This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 94db24a4 [Transaction] Fix topicTransactionBuffer handle null snapshot 
(#12758)
94db24a4 is described below

commit 94db24a42120786186043c1ae5dc1dca6ef58d18
Author: congbo <[email protected]>
AuthorDate: Fri Nov 12 13:11:42 2021 +0800

    [Transaction] Fix topicTransactionBuffer handle null snapshot (#12758)
    
    fix https://github.com/apache/pulsar/issues/12754
    Now when delete topic, we will write a null value to Transaction buffer 
snapshot topic, other topic recover by this transaction buffer snapshot system 
topic, will produce NPE
    
    judge NPE logic
    
    (cherry picked from commit c90c89b07ce544b92becedfaa7a0090b4b73edd2)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java | 16 +++++++++-------
 .../pulsar/broker/transaction/TransactionTest.java      |  2 +-
 .../pulsar/client/impl/TransactionEndToEndTest.java     | 17 +++++++++++++++++
 3 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 79f7f35..735927c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -532,14 +532,16 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 try {
                     boolean hasSnapshot = false;
                     while (reader.hasMoreEvents()) {
-                        hasSnapshot = true;
                         Message<TransactionBufferSnapshot> message = 
reader.readNext();
-                        TransactionBufferSnapshot transactionBufferSnapshot = 
message.getValue();
-                        if 
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
-                            callBack.handleSnapshot(transactionBufferSnapshot);
-                            this.startReadCursorPosition = PositionImpl.get(
-                                    
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-                                    
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                        if (topic.getName().equals(message.getKey())) {
+                            TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
+                            if (transactionBufferSnapshot != null) {
+                                hasSnapshot = true;
+                                
callBack.handleSnapshot(transactionBufferSnapshot);
+                                this.startReadCursorPosition = 
PositionImpl.get(
+                                        
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                        
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                            }
                         }
                     }
                     if (!hasSnapshot) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 971f8b2..e14c777 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -295,4 +295,4 @@ public class TransactionTest extends TransactionTestBase {
             Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 1);
         });
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index d7cb6c9..9c30f4a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -377,6 +377,23 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
     }
 
     @Test
+    public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
+        String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
+        String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
+        String sub = "test";
+        admin.topics().createNonPartitionedTopic(topicOne);
+        admin.topics().createSubscription(topicOne, "test", 
MessageId.earliest);
+        admin.topics().delete(topicOne);
+
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicTwo).subscriptionName(sub).subscribe();
+        String content = "test";
+        producer.send(content);
+        assertEquals(consumer.receive().getValue(), content);
+    }
+
+    @Test
     public void txnMessageAckTest() throws Exception {
         String topic = TOPIC_MESSAGE_ACK_TEST;
         final String subName = "test";

Reply via email to