This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b5ef09e537d [fix][test] Fix SegmentAbortedTxnProcessorTest (#20358)
b5ef09e537d is described below

commit b5ef09e537ddb24419e9ad05b1dc940ab10ea70a
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon May 29 18:01:24 2023 +0800

    [fix][test] Fix SegmentAbortedTxnProcessorTest (#20358)
---
 .../SegmentAbortedTxnProcessorTest.java            | 35 +++++++++++++++++++---
 .../broker/transaction/TransactionTestBase.java    |  6 ++--
 .../buffer/TransactionBufferCloseTest.java         |  4 ---
 3 files changed, 34 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index cb15ab003f7..0600833b1ad 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.fail;
 import java.lang.reflect.Field;
 import java.util.LinkedList;
 import java.util.NavigableMap;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -38,10 +39,12 @@ import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
@@ -56,6 +59,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -71,11 +75,13 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
     @Override
     @BeforeClass
     protected void setup() throws Exception {
-        setUpBase(1, 1, PROCESSOR_TOPIC, 0);
+        setUpBase(1, 1, null, 0);
         this.pulsarService = getPulsarServiceList().get(0);
         
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
         
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + 
PROCESSOR_TOPIC.length() +
                 SEGMENT_SIZE * 3);
+        admin.topics().createNonPartitionedTopic(PROCESSOR_TOPIC);
+        assertTrue(getSnapshotAbortedTxnProcessor(PROCESSOR_TOPIC) instanceof 
SnapshotSegmentAbortedTxnProcessorImpl);
     }
 
     @Override
@@ -311,15 +317,18 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
      */
     @Test
     public void testSnapshotProcessorUpgrade() throws Exception {
+        String NAMESPACE2 = TENANT + "/ns2";
+        admin.namespaces().createNamespace(NAMESPACE2);
         this.pulsarService = getPulsarServiceList().get(0);
         
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
 
         // Create a topic, send 10 messages without using transactions, and 
send 10 messages using transactions.
         // Abort these transactions and verify the data.
-        final String topicName = "persistent://" + NAMESPACE1 + 
"/testSnapshotProcessorUpgrade";
+        final String topicName = "persistent://" + NAMESPACE2 + 
"/testSnapshotProcessorUpgrade";
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
 
+        assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof 
SingleSnapshotAbortedTxnProcessorImpl);
         // Send 10 messages without using transactions
         for (int i = 0; i < 10; i++) {
             producer.send(("test-message-" + i).getBytes());
@@ -352,6 +361,7 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
 
         // Unload the topic
         admin.topics().unload(topicName);
+        assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof 
SnapshotSegmentAbortedTxnProcessorImpl);
 
         // Sends a new message using a transaction and aborts it.
         Transaction txn = pulsarClient.newTransaction()
@@ -362,7 +372,7 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
 
         // Verifies that the topic has exactly one segment.
         Awaitility.await().untilAsserted(() -> {
-            String segmentTopic = "persistent://" + NAMESPACE1 + "/" +
+            String segmentTopic = "persistent://" + NAMESPACE2 + "/" +
                     SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
             TopicStats topicStats = admin.topics().getStats(segmentTopic);
             assertEquals(1, topicStats.getMsgInCounter());
@@ -401,7 +411,7 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
         String topicName = "persistent://" + namespaceName + "/newTopic";
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
         producer.close();
-
+        assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof 
SnapshotSegmentAbortedTxnProcessorImpl);
         // Check that the __transaction_buffer_snapshot topic is not created 
in the same namespace
         String transactionBufferSnapshotTopic = "persistent://" + 
namespaceName + "/" +
                 SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
@@ -415,4 +425,21 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
         // Destroy the namespace after the test
         admin.namespaces().deleteNamespace(namespaceName, true);
     }
+
+    private AbortedTxnProcessor getSnapshotAbortedTxnProcessor(String 
topicName) {
+        PersistentTopic persistentTopic = getPersistentTopic(topicName);
+        return 
WhiteboxImpl.getInternalState(persistentTopic.getTransactionBuffer(), 
"snapshotAbortedTxnProcessor");
+    }
+
+    private PersistentTopic getPersistentTopic(String topicName) {
+        for (PulsarService pulsar : getPulsarServiceList()) {
+            CompletableFuture<Optional<Topic>> future =
+                    pulsar.getBrokerService().getTopic(topicName, false);
+            if (future == null) {
+                continue;
+            }
+            return (PersistentTopic) future.join().get();
+        }
+        throw new NullPointerException("topic[" + topicName +  "] not found");
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index f45eda8d21f..cd0c089ad41 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -114,10 +114,10 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign(numPartitionsOfTC);
+        admin.tenants().createTenant(TENANT,
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NAMESPACE1);
         if (topic != null) {
-            admin.tenants().createTenant(TENANT,
-                    new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-            admin.namespaces().createNamespace(NAMESPACE1);
             if (numPartitions == 0) {
                 admin.topics().createNonPartitionedTopic(topic);
             } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
index e92cf29521e..d1784f6a392 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
-import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -30,7 +29,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -49,8 +47,6 @@ public class TransactionBufferCloseTest extends 
TransactionTestBase {
         setUpBase(1, 16, null, 0);
         Awaitility.await().until(() -> ((PulsarClientImpl) pulsarClient)
                 .getTcClient().getState() == 
TransactionCoordinatorClient.State.READY);
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
     }
 
     @AfterMethod(alwaysRun = true)

Reply via email to