This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 94db24a4 [Transaction] Fix topicTransactionBuffer handle null snapshot
(#12758)
94db24a4 is described below
commit 94db24a42120786186043c1ae5dc1dca6ef58d18
Author: congbo <[email protected]>
AuthorDate: Fri Nov 12 13:11:42 2021 +0800
[Transaction] Fix topicTransactionBuffer handle null snapshot (#12758)
fix https://github.com/apache/pulsar/issues/12754
Now when delete topic, we will write a null value to Transaction buffer
snapshot topic, other topic recover by this transaction buffer snapshot system
topic, will produce NPE
judge NPE logic
(cherry picked from commit c90c89b07ce544b92becedfaa7a0090b4b73edd2)
---
.../transaction/buffer/impl/TopicTransactionBuffer.java | 16 +++++++++-------
.../pulsar/broker/transaction/TransactionTest.java | 2 +-
.../pulsar/client/impl/TransactionEndToEndTest.java | 17 +++++++++++++++++
3 files changed, 27 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 79f7f35..735927c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -532,14 +532,16 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
try {
boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
- hasSnapshot = true;
Message<TransactionBufferSnapshot> message =
reader.readNext();
- TransactionBufferSnapshot transactionBufferSnapshot =
message.getValue();
- if
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
- callBack.handleSnapshot(transactionBufferSnapshot);
- this.startReadCursorPosition = PositionImpl.get(
-
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-
transactionBufferSnapshot.getMaxReadPositionEntryId());
+ if (topic.getName().equals(message.getKey())) {
+ TransactionBufferSnapshot
transactionBufferSnapshot = message.getValue();
+ if (transactionBufferSnapshot != null) {
+ hasSnapshot = true;
+
callBack.handleSnapshot(transactionBufferSnapshot);
+ this.startReadCursorPosition =
PositionImpl.get(
+
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+
transactionBufferSnapshot.getMaxReadPositionEntryId());
+ }
}
}
if (!hasSnapshot) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 971f8b2..e14c777 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -295,4 +295,4 @@ public class TransactionTest extends TransactionTestBase {
Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 1);
});
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index d7cb6c9..9c30f4a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -377,6 +377,23 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
@Test
+ public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
+ String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
+ String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
+ String sub = "test";
+ admin.topics().createNonPartitionedTopic(topicOne);
+ admin.topics().createSubscription(topicOne, "test",
MessageId.earliest);
+ admin.topics().delete(topicOne);
+
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicTwo).subscriptionName(sub).subscribe();
+ String content = "test";
+ producer.send(content);
+ assertEquals(consumer.receive().getValue(), content);
+ }
+
+ @Test
public void txnMessageAckTest() throws Exception {
String topic = TOPIC_MESSAGE_ACK_TEST;
final String subName = "test";