This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4e132d3f291 [improve] [test] Add a test to guarantee the TNX topics
will not be replicated (#22721)
4e132d3f291 is described below
commit 4e132d3f2919767cef6fa935a59721937cb668e7
Author: fengyubiao <[email protected]>
AuthorDate: Fri May 17 01:18:49 2024 +0800
[improve] [test] Add a test to guarantee the TNX topics will not be
replicated (#22721)
---
.../broker/service/OneWayReplicatorTest.java | 9 -
.../broker/service/OneWayReplicatorTestBase.java | 44 +++-
.../pulsar/broker/service/ReplicationTxnTest.java | 262 +++++++++++++++++++++
3 files changed, 297 insertions(+), 18 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 99fd4d877c1..fae72e8eac2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -96,15 +96,6 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
super.cleanup();
}
- private void waitReplicatorStarted(String topicName) {
- Awaitility.await().untilAsserted(() -> {
- Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
- assertTrue(topicOptional2.isPresent());
- PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
- assertFalse(persistentTopic2.getProducers().isEmpty());
- });
- }
-
private void waitReplicatorStopped(String topicName) {
Awaitility.await().untilAsserted(() -> {
Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index b4eed00c447..317e43306e3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.net.URL;
import java.time.Duration;
@@ -29,6 +31,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -55,7 +58,7 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
protected ZookeeperServerTest brokerConfigZk1;
protected LocalBookkeeperEnsemble bkEnsemble1;
protected PulsarService pulsar1;
- protected BrokerService ns1;
+ protected BrokerService broker1;
protected PulsarAdmin admin1;
protected PulsarClient client1;
@@ -66,7 +69,7 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
protected ZookeeperServerTest brokerConfigZk2;
protected LocalBookkeeperEnsemble bkEnsemble2;
protected PulsarService pulsar2;
- protected BrokerService ns2;
+ protected BrokerService broker2;
protected PulsarAdmin admin2;
protected PulsarClient client2;
@@ -89,23 +92,29 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1);
pulsar1 = new PulsarService(config1);
pulsar1.start();
- ns1 = pulsar1.getBrokerService();
-
+ broker1 = pulsar1.getBrokerService();
url1 = new URL(pulsar1.getWebServiceAddress());
urlTls1 = new URL(pulsar1.getWebServiceAddressTls());
- admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
- client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
// Start region 2
setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2);
pulsar2 = new PulsarService(config2);
pulsar2.start();
- ns2 = pulsar2.getBrokerService();
-
+ broker2 = pulsar2.getBrokerService();
url2 = new URL(pulsar2.getWebServiceAddress());
urlTls2 = new URL(pulsar2.getWebServiceAddressTls());
+ }
+
+ protected void startAdminClient() throws Exception {
+ admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
- client2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
+ }
+
+ protected void startPulsarClient() throws Exception{
+ ClientBuilder clientBuilder1 =
PulsarClient.builder().serviceUrl(url1.toString());
+ client1 = initClient(clientBuilder1);
+ ClientBuilder clientBuilder2 =
PulsarClient.builder().serviceUrl(url2.toString());
+ client2 = initClient(clientBuilder2);
}
protected void createDefaultTenantsAndClustersAndNamespace() throws
Exception {
@@ -196,8 +205,12 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
startBrokers();
+ startAdminClient();
+
createDefaultTenantsAndClustersAndNamespace();
+ startPulsarClient();
+
Thread.sleep(100);
log.info("--- OneWayReplicatorTestBase::setup completed ---");
}
@@ -287,4 +300,17 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
config1 = new ServiceConfiguration();
config2 = new ServiceConfiguration();
}
+
+ protected void waitReplicatorStarted(String topicName) {
+ Awaitility.await().untilAsserted(() -> {
+ Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
+ assertTrue(topicOptional2.isPresent());
+ PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
+ assertFalse(persistentTopic2.getProducers().isEmpty());
+ });
+ }
+
+ protected PulsarClient initClient(ClientBuilder clientBuilder) throws
Exception {
+ return clientBuilder.build();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
new file mode 100644
index 00000000000..3caf4a1f239
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ReplicationTxnTest extends OneWayReplicatorTestBase {
+
+ private boolean transactionBufferSegmentedSnapshotEnabled = false;
+ private int txnLogPartitions = 4;
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Override
+ protected PulsarClient initClient(ClientBuilder clientBuilder) throws
Exception {
+ return clientBuilder.enableTransaction(true).build();
+ }
+
+ @Override
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+ super.setConfigDefaults(config, clusterName, bookkeeperEnsemble,
brokerConfigZk);
+ config.setSystemTopicEnabled(true);
+ config.setTopicLevelPoliciesEnabled(true);
+ config.setTransactionCoordinatorEnabled(true);
+ config.setTransactionLogBatchedWriteEnabled(true);
+ config.setTransactionPendingAckBatchedWriteEnabled(true);
+
config.setTransactionBufferSegmentedSnapshotEnabled(transactionBufferSegmentedSnapshotEnabled);
+ }
+
+ @Override
+ protected void createDefaultTenantsAndClustersAndNamespace() throws
Exception {
+ super.createDefaultTenantsAndClustersAndNamespace();
+
+ // Create resource that transaction function relies on.
+ admin1.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(), new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(cluster1, cluster2)));
+ admin1.namespaces().createNamespace(SYSTEM_NAMESPACE.toString(), 4);
+
pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().createPartitionedTopic(
+ SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, new
PartitionedTopicMetadata(txnLogPartitions));
+
//admin1.topics().createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString(),
4);
+
+ admin2.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(), new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(cluster1, cluster2)));
+ admin2.namespaces().createNamespace(SYSTEM_NAMESPACE.toString(), 4);
+
pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().createPartitionedTopic(
+ SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, new
PartitionedTopicMetadata(txnLogPartitions));
+ }
+
+ private void pubAndSubOneMsg(String topic, String subscription) throws
Exception {
+ Consumer consumer1 =
client1.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
+ .isAckReceiptEnabled(true).subscribe();
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topic).create();
+ producer1.newMessage().value("msg1").send();
+ // start txn.
+ Transaction txn = client1.newTransaction().withTransactionTimeout(1,
TimeUnit.MINUTES).build().get();
+ // consume.
+ Message<String> c1Msg1 = consumer1.receive(5, TimeUnit.SECONDS);
+ assertNotNull(c1Msg1);
+ assertEquals(c1Msg1.getValue(), "msg1");
+ consumer1.acknowledgeAsync(c1Msg1.getMessageId(), txn).join();
+ // send.
+ producer1.newMessage(txn).value("msg2").send();
+ // commit.
+ txn.commit().get();
+
+ // Consume the msg with TXN.
+ Message<String> c1Msg2 = consumer1.receive(5, TimeUnit.SECONDS);
+ assertNotNull(c1Msg2);
+ assertEquals(c1Msg2.getValue(), "msg2");
+ consumer1.acknowledgeAsync(c1Msg2.getMessageId()).join();
+
+ // Consume messages on the remote cluster.
+ Consumer consumer2 =
client2.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription).subscribe();
+ Message<String> c2Msg1 = consumer2.receive(15, TimeUnit.SECONDS);
+ assertNotNull(c2Msg1);
+ MessageMetadata msgMetadata1 = WhiteboxImpl.getInternalState(c2Msg1,
"msgMetadata");
+ // Verify: the messages replicated has no TXN id.
+ assertFalse(msgMetadata1.hasTxnidMostBits());
+ assertFalse(msgMetadata1.hasTxnidLeastBits());
+ consumer2.acknowledge(c2Msg1);
+ Message<String> c2Msg2 = consumer2.receive(15, TimeUnit.SECONDS);
+ assertNotNull(c2Msg2);
+ MessageMetadata msgMetadata2 = WhiteboxImpl.getInternalState(c2Msg2,
"msgMetadata");
+ // Verify: the messages replicated has no TXN id.
+ assertFalse(msgMetadata2.hasTxnidMostBits());
+ assertFalse(msgMetadata2.hasTxnidLeastBits());
+ consumer2.acknowledge(c2Msg2);
+
+ // cleanup.
+ producer1.close();
+ consumer1.close();
+ consumer2.close();
+ }
+
+ private void verifyNoReplicator(BrokerService broker, TopicName topicName)
throws Exception {
+ String tpStr = topicName.toString();
+ CompletableFuture<Optional<Topic>> future = broker.getTopic(tpStr,
true);
+ if (future == null) {
+ return;
+ }
+ PersistentTopic persistentTopic = (PersistentTopic)
future.join().get();
+ assertTrue(persistentTopic.getReplicators().isEmpty());
+ }
+
+ @Test
+ public void testTxnLogNotBeReplicated() throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
replicatedNamespace + "/tp");
+ final String subscription = "s1";
+ admin1.topics().createNonPartitionedTopic(topic);
+ waitReplicatorStarted(topic);
+ admin1.topics().createSubscription(topic, subscription,
MessageId.earliest);
+ admin2.topics().createSubscription(topic, subscription,
MessageId.earliest);
+ // Pub & Sub.
+ pubAndSubOneMsg(topic, subscription);
+ // To cover more cases, sleep 3s.
+ Thread.sleep(3000);
+
+ // Verify: messages on the TXN system topic did not been replicated.
+ // __transaction_log_: it only uses ML, will not create topic.
+ for (int i = 0; i < txnLogPartitions; i++) {
+ TopicName txnLog = TopicName.get(TopicDomain.persistent.value(),
+ NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX +
i);
+ assertNotNull(pulsar1.getManagedLedgerFactory()
+
.getManagedLedgerInfo(txnLog.getPersistenceNamingEncoding()));
+ assertFalse(broker1.getTopics().containsKey(txnLog.toString()));
+ }
+ // __transaction_pending_ack: it only uses ML, will not create topic.
+ TopicName pendingAck = TopicName.get(
+ MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
subscription));
+ assertNotNull(pulsar1.getManagedLedgerFactory()
+
.getManagedLedgerInfo(pendingAck.getPersistenceNamingEncoding()));
+ assertFalse(broker1.getTopics().containsKey(pendingAck.toString()));
+ // __transaction_buffer_snapshot.
+ verifyNoReplicator(broker1,
TopicName.get(TopicDomain.persistent.value(),
+ TopicName.get(topic).getNamespaceObject(),
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT));
+ verifyNoReplicator(broker1,
TopicName.get(TopicDomain.persistent.value(),
+ TopicName.get(topic).getNamespaceObject(),
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
+ verifyNoReplicator(broker1,
TopicName.get(TopicDomain.persistent.value(),
+ TopicName.get(topic).getNamespaceObject(),
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+
+ // cleanup.
+ cleanupTopics(() -> {
+ admin1.topics().delete(topic);
+ admin2.topics().delete(topic);
+ try {
+ admin1.topics().delete(pendingAck.toString());
+ } catch (Exception ex) {}
+ try {
+ admin2.topics().delete(pendingAck.toString());
+ } catch (Exception ex) {}
+ });
+ }
+
+ @Test
+ public void testOngoingMessagesWillNotBeReplicated() throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
replicatedNamespace + "/tp");
+ final String subscription = "s1";
+ admin1.topics().createNonPartitionedTopic(topic);
+ waitReplicatorStarted(topic);
+ admin1.topics().createSubscription(topic, subscription,
MessageId.earliest);
+ admin2.topics().createSubscription(topic, subscription,
MessageId.earliest);
+ // Pub without commit.
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topic).create();
+ Transaction txn = client1.newTransaction().withTransactionTimeout(1,
TimeUnit.HOURS).build().get();
+ producer1.newMessage(txn).value("msg1").send();
+ // Verify: receive nothing on the remote cluster.
+ Consumer consumer2 =
client2.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription).subscribe();
+ Message<String> msg = consumer2.receive(15, TimeUnit.SECONDS);
+ assertNull(msg);
+ // Verify: the repl cursor is not end of the topic.
+ PersistentTopic persistentTopic = (PersistentTopic)
broker1.getTopic(topic, false).join().get();
+ GeoPersistentReplicator replicator =
+ (GeoPersistentReplicator)
persistentTopic.getReplicators().values().iterator().next();
+ assertTrue(replicator.getCursor().hasMoreEntries());
+
+ // cleanup.
+ producer1.close();
+ consumer2.close();
+ cleanupTopics(() -> {
+ admin1.topics().delete(topic);
+ admin2.topics().delete(topic);
+ TopicName pendingAck = TopicName.get(
+
MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subscription));
+ try {
+ admin1.topics().delete(pendingAck.toString());
+ } catch (Exception ex) {}
+ try {
+ admin2.topics().delete(pendingAck.toString());
+ } catch (Exception ex) {}
+ });
+ }
+}