This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b5ef09e537d [fix][test] Fix SegmentAbortedTxnProcessorTest (#20358)
b5ef09e537d is described below
commit b5ef09e537ddb24419e9ad05b1dc940ab10ea70a
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon May 29 18:01:24 2023 +0800
[fix][test] Fix SegmentAbortedTxnProcessorTest (#20358)
---
.../SegmentAbortedTxnProcessorTest.java | 35 +++++++++++++++++++---
.../broker/transaction/TransactionTestBase.java | 6 ++--
.../buffer/TransactionBufferCloseTest.java | 4 ---
3 files changed, 34 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index cb15ab003f7..0600833b1ad 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -38,10 +39,12 @@ import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import
org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
import
org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
@@ -56,6 +59,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -71,11 +75,13 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
@Override
@BeforeClass
protected void setup() throws Exception {
- setUpBase(1, 1, PROCESSOR_TOPIC, 0);
+ setUpBase(1, 1, null, 0);
this.pulsarService = getPulsarServiceList().get(0);
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 +
PROCESSOR_TOPIC.length() +
SEGMENT_SIZE * 3);
+ admin.topics().createNonPartitionedTopic(PROCESSOR_TOPIC);
+ assertTrue(getSnapshotAbortedTxnProcessor(PROCESSOR_TOPIC) instanceof
SnapshotSegmentAbortedTxnProcessorImpl);
}
@Override
@@ -311,15 +317,18 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
*/
@Test
public void testSnapshotProcessorUpgrade() throws Exception {
+ String NAMESPACE2 = TENANT + "/ns2";
+ admin.namespaces().createNamespace(NAMESPACE2);
this.pulsarService = getPulsarServiceList().get(0);
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
// Create a topic, send 10 messages without using transactions, and
send 10 messages using transactions.
// Abort these transactions and verify the data.
- final String topicName = "persistent://" + NAMESPACE1 +
"/testSnapshotProcessorUpgrade";
+ final String topicName = "persistent://" + NAMESPACE2 +
"/testSnapshotProcessorUpgrade";
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+ assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof
SingleSnapshotAbortedTxnProcessorImpl);
// Send 10 messages without using transactions
for (int i = 0; i < 10; i++) {
producer.send(("test-message-" + i).getBytes());
@@ -352,6 +361,7 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
// Unload the topic
admin.topics().unload(topicName);
+ assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof
SnapshotSegmentAbortedTxnProcessorImpl);
// Sends a new message using a transaction and aborts it.
Transaction txn = pulsarClient.newTransaction()
@@ -362,7 +372,7 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
// Verifies that the topic has exactly one segment.
Awaitility.await().untilAsserted(() -> {
- String segmentTopic = "persistent://" + NAMESPACE1 + "/" +
+ String segmentTopic = "persistent://" + NAMESPACE2 + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
TopicStats topicStats = admin.topics().getStats(segmentTopic);
assertEquals(1, topicStats.getMsgInCounter());
@@ -401,7 +411,7 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
String topicName = "persistent://" + namespaceName + "/newTopic";
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
producer.close();
-
+ assertTrue(getSnapshotAbortedTxnProcessor(topicName) instanceof
SnapshotSegmentAbortedTxnProcessorImpl);
// Check that the __transaction_buffer_snapshot topic is not created
in the same namespace
String transactionBufferSnapshotTopic = "persistent://" +
namespaceName + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
@@ -415,4 +425,21 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
// Destroy the namespace after the test
admin.namespaces().deleteNamespace(namespaceName, true);
}
+
+ private AbortedTxnProcessor getSnapshotAbortedTxnProcessor(String
topicName) {
+ PersistentTopic persistentTopic = getPersistentTopic(topicName);
+ return
WhiteboxImpl.getInternalState(persistentTopic.getTransactionBuffer(),
"snapshotAbortedTxnProcessor");
+ }
+
+ private PersistentTopic getPersistentTopic(String topicName) {
+ for (PulsarService pulsar : getPulsarServiceList()) {
+ CompletableFuture<Optional<Topic>> future =
+ pulsar.getBrokerService().getTopic(topicName, false);
+ if (future == null) {
+ continue;
+ }
+ return (PersistentTopic) future.join().get();
+ }
+ throw new NullPointerException("topic[" + topicName + "] not found");
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index f45eda8d21f..cd0c089ad41 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -114,10 +114,10 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
createTransactionCoordinatorAssign(numPartitionsOfTC);
+ admin.tenants().createTenant(TENANT,
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
+ admin.namespaces().createNamespace(NAMESPACE1);
if (topic != null) {
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
if (numPartitions == 0) {
admin.topics().createNonPartitionedTopic(topic);
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
index e92cf29521e..d1784f6a392 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
-import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -30,7 +29,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -49,8 +47,6 @@ public class TransactionBufferCloseTest extends
TransactionTestBase {
setUpBase(1, 16, null, 0);
Awaitility.await().until(() -> ((PulsarClientImpl) pulsarClient)
.getTcClient().getState() ==
TransactionCoordinatorClient.State.READY);
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
}
@AfterMethod(alwaysRun = true)