This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cb703ca Optimize Tests (#12560)
cb703ca is described below
commit cb703cab9b58bc7fe2da5694e2404538c44dab7a
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Nov 1 23:36:38 2021 +0800
Optimize Tests (#12560)
---
.../TopicTransactionBufferRecoverTest.java | 36 +----------
.../TransactionClientReconnectTest.java | 31 +---------
.../broker/transaction/TransactionProduceTest.java | 42 ++-----------
.../pulsar/broker/transaction/TransactionTest.java | 26 +-------
.../broker/transaction/TransactionTestBase.java | 69 ++++++++++++++++------
.../buffer/TransactionLowWaterMarkTest.java | 39 +-----------
.../buffer/TransactionStablePositionTest.java | 26 +-------
.../TransactionMetaStoreAssignmentTest.java | 14 +----
.../pendingack/PendingAckInMemoryDeleteTest.java | 43 +-------------
.../pendingack/PendingAckPersistentTest.java | 35 ++---------
.../client/impl/TransactionEndToEndTest.java | 38 +-----------
11 files changed, 72 insertions(+), 327 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 3607b45..335cecc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.transaction;
-import com.google.common.collect.Sets;
-
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -55,10 +53,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
@@ -75,8 +70,6 @@ import static org.testng.Assert.assertTrue;
@Slf4j
public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
- private static final String TENANT = "tnx";
- private static final String NAMESPACE1 = TENANT + "/ns1";
private static final String RECOVER_COMMIT = NAMESPACE1 +
"/recover-commit";
private static final String RECOVER_ABORT = NAMESPACE1 + "/recover-abort";
private static final String SUBSCRIPTION_NAME = "test-recover";
@@ -85,36 +78,9 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
- setBrokerCount(1);
- internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
- admin.topics().createNonPartitionedTopic(RECOVER_COMMIT);
+ setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
-
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
NUM_PARTITIONS);
-
- if (pulsarClient != null) {
- pulsarClient.shutdown();
- }
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
-
-
- // wait tc init success to ready state
- waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
index 1f5ab15..41f98c0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
@@ -18,19 +18,13 @@
*/
package org.apache.pulsar.broker.transaction;
-import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
@@ -49,33 +43,12 @@ import static org.testng.FileAssert.fail;
public class TransactionClientReconnectTest extends TransactionTestBase {
- private static final String RECONNECT_TOPIC =
"persistent://public/txn/txn-client-reconnect-test";
+ private static final String RECONNECT_TOPIC = NAMESPACE1 +
"/txn-client-reconnect-test";
private static final int NUM_PARTITIONS = 1;
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
- setBrokerCount(1);
- super.internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet(),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace("public/txn", 10);
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
- admin.topics().createNonPartitionedTopic(RECONNECT_TOPIC);
+ setUpBase(1, NUM_PARTITIONS, RECONNECT_TOPIC, 0);
admin.topics().createSubscription(RECONNECT_TOPIC, "test",
MessageId.latest);
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
NUM_PARTITIONS);
-
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
- // wait tc init success to ready state
- waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0e63f53..cbae03b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
@@ -50,10 +49,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -69,9 +65,6 @@ import org.testng.annotations.Test;
public class TransactionProduceTest extends TransactionTestBase {
private static final int TOPIC_PARTITION = 3;
-
- private static final String TENANT = "tnx";
- private static final String NAMESPACE1 = TENANT + "/ns1";
private static final String PRODUCE_COMMIT_TOPIC = NAMESPACE1 +
"/produce-commit";
private static final String PRODUCE_ABORT_TOPIC = NAMESPACE1 +
"/produce-abort";
private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
@@ -79,37 +72,10 @@ public class TransactionProduceTest extends
TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
- setBrokerCount(1);
- internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
- admin.topics().createPartitionedTopic(PRODUCE_COMMIT_TOPIC, 3);
- admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 3);
- admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 3);
- admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 3);
-
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
NUM_PARTITIONS);
-
- if (pulsarClient != null) {
- pulsarClient.shutdown();
- }
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
-
-
- // wait tc init success to ready state
- waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+ setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION);
+ admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC,
TOPIC_PARTITION);
+ admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC,
TOPIC_PARTITION);
+ admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC,
TOPIC_PARTITION);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index bb172b5..a862405 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -77,36 +77,12 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class TransactionTest extends TransactionTestBase {
- private static final String TENANT = "tnx";
- private static final String NAMESPACE1 = TENANT + "/ns1";
private static final int NUM_BROKERS = 1;
private static final int NUM_PARTITIONS = 1;
@BeforeMethod
protected void setup() throws Exception {
- this.setBrokerCount(NUM_BROKERS);
- this.internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
- 1];
- admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
- .serviceUrl("http://localhost:" + webServicePort).build());
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
-
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
NUM_PARTITIONS);
- pulsarClient.close();
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
- // wait tc init success to ready state
- waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+ setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 622421b..936e43e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
@@ -45,10 +46,10 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -61,6 +62,7 @@ import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
+import org.testng.Assert;
@Slf4j
public abstract class TransactionTestBase extends TestRetrySupport {
@@ -83,6 +85,9 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
private OrderedExecutor bkExecutor;
private NonClosableMockBookKeeper mockBookKeeper;
+ public static final String TENANT = "tnx";
+ protected static final String NAMESPACE1 = TENANT + "/ns1";
+
public void internalSetup() throws Exception {
incrementSetupNumber();
init();
@@ -108,6 +113,40 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
mockBookKeeper = createMockBookKeeper(bkExecutor);
startBroker();
}
+ protected void setUpBase(int numBroker,int numPartitionsOfTC, String
topic, int numPartitions) throws Exception{
+ setBrokerCount(numBroker);
+ internalSetup();
+
+ String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+ String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
+ admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:"
+ + webServicePort).build());
+
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
numPartitionsOfTC);
+ if (topic != null) {
+ admin.tenants().createTenant(TENANT,
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
+ admin.namespaces().createNamespace(NAMESPACE1);
+ if (numPartitions == 0) {
+ admin.topics().createNonPartitionedTopic(topic);
+ } else {
+ admin.topics().createPartitionedTopic(topic, numPartitions);
+ }
+ }
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
+ }
+ pulsarClient = PulsarClient.builder()
+
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .enableTransaction(true)
+ .build();
+ // wait tc init success to ready state
+ waitForCoordinatorToBeAvailable(numPartitionsOfTC);
+ }
protected void startBroker() throws Exception {
for (int i = 0; i < brokerCount; i++) {
@@ -295,20 +334,12 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
}
public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){
// wait tc init success to ready state
- Awaitility.await().until(() -> {
- Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
-
getPulsarServiceList().get(brokerCount-1).getTransactionMetadataStoreService().getStores();
- if (stores.size() == numOfTCPerBroker) {
- for (TransactionCoordinatorID transactionCoordinatorID :
stores.keySet()) {
- if (((MLTransactionMetadataStore)
stores.get(transactionCoordinatorID)).getState()
- != TransactionMetadataStoreState.State.Ready) {
- return false;
- }
- }
- return true;
- } else {
- return false;
- }
- });
+ Awaitility.await()
+ .untilAsserted(() -> {
+ int transactionMetaStoreCount = pulsarServiceList.stream()
+ .mapToInt(pulsarService ->
pulsarService.getTransactionMetadataStoreService().getStores().size())
+ .sum();
+ Assert.assertEquals(transactionMetaStoreCount,
numOfTCPerBroker);
+ });
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index db9d407..873509f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -23,17 +23,12 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
-import com.google.common.collect.Sets;
-
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
-import javax.validation.constraints.AssertTrue;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -47,7 +42,6 @@ import
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -55,17 +49,13 @@ import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -80,35 +70,12 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class TransactionLowWaterMarkTest extends TransactionTestBase {
- private static final String TENANT = "tnx";
- private static final String NAMESPACE1 = TENANT + "/ns1";
private static final String TOPIC = NAMESPACE1 + "/test-topic";
@BeforeMethod(alwaysRun = true)
protected void setup() throws Exception {
- setBrokerCount(1);
- internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
- admin.topics().createNonPartitionedTopic(TOPIC);
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
16);
-
- if (pulsarClient != null) {
- pulsarClient.shutdown();
- }
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
+ setUpBase(1, 16, TOPIC, 0);
+
Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
Awaitility.await().until(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index e43f262..ef1c761 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -54,35 +54,11 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class TransactionStablePositionTest extends TransactionTestBase {
- private static final String TENANT = "tnx";
- private static final String NAMESPACE1 = TENANT + "/ns1";
private static final String TOPIC = NAMESPACE1 + "/test-topic";
@BeforeMethod
protected void setup() throws Exception {
- internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
- admin.topics().createNonPartitionedTopic(TOPIC);
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
16);
-
- if (pulsarClient != null) {
- pulsarClient.shutdown();
- }
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
-
+ setUpBase(1, 16, TOPIC, 0);
Awaitility.await().until(() -> ((PulsarClientImpl) pulsarClient)
.getTcClient().getState() ==
TransactionCoordinatorClient.State.READY);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index 1725305..0102786 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -20,15 +20,11 @@ package org.apache.pulsar.broker.transaction.coordinator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.Sets;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ServiceUrlProvider;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -41,15 +37,7 @@ public class TransactionMetaStoreAssignmentTest extends
TransactionTestBase {
@Override
@BeforeMethod(alwaysRun = true)
protected void setup() throws Exception {
- setBrokerCount(3);
- super.internalSetup();
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
16);
+ setUpBase(3, 16, null, 0);
pulsarClient.close();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index fc952c4..bc22473 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;
-import com.google.common.collect.Sets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -35,21 +34,11 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -58,7 +47,6 @@ import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.HashMap;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -71,39 +59,10 @@ import static org.testng.Assert.assertTrue;
@Test(groups = "broker")
public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
- private static final String TENANT = "tnx";
- private static final String NAMESPACE1 = TENANT + "/ns1";
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
- setBrokerCount(1);
- internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
NUM_PARTITIONS);
-
- if (pulsarClient != null) {
- pulsarClient.shutdown();
- }
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
-
- Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
-
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
- // wait tc init success to ready state
- waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+ setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 3820ebc..97f8f51d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -62,36 +62,13 @@ import org.testng.annotations.Test;
@Slf4j
public class PendingAckPersistentTest extends TransactionTestBase {
- private static final String PENDING_ACK_REPLAY_TOPIC =
"persistent://public/txn/pending-ack-replay";
-
- private static final String NAMESPACE = "public/txn";
+ private static final String PENDING_ACK_REPLAY_TOPIC = NAMESPACE1 +
"/pending-ack-replay";
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
public void setup() throws Exception {
- setBrokerCount(1);
- super.internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterDataImpl.builder().serviceUrl("http://localhost:" +
webServicePort).build());
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
16);
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet(),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE, 10);
- admin.topics().createNonPartitionedTopic(PENDING_ACK_REPLAY_TOPIC);
-
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
- // wait tc init success to ready state
- waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+ setUpBase(1, NUM_PARTITIONS, PENDING_ACK_REPLAY_TOPIC, 0);
}
@AfterMethod(alwaysRun = true)
@@ -312,7 +289,7 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
String subName = "test-delete";
String topic = TopicName.get(TopicDomain.persistent.toString(),
- NamespaceName.get(NAMESPACE), "test-delete").toString();
+ NamespaceName.get(NAMESPACE1), "test-delete").toString();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
@@ -325,7 +302,7 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
admin.topics().deleteSubscription(topic, subName);
- List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+ List<String> topics = admin.namespaces().getTopics(NAMESPACE1);
TopicStats topicStats = admin.topics().getStats(topic, false);
@@ -341,7 +318,7 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
String subName2 = "test-delete";
String topic = TopicName.get(TopicDomain.persistent.toString(),
- NamespaceName.get(NAMESPACE), "test-delete").toString();
+ NamespaceName.get(NAMESPACE1), "test-delete").toString();
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
@@ -364,7 +341,7 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
admin.topics().delete(topic);
- List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+ List<String> topics = admin.namespaces().getTopics(NAMESPACE1);
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
subName1)));
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
subName2)));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index d7cb6c9..f343742 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -23,9 +23,6 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
-import com.google.common.collect.Sets;
-
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -63,11 +60,8 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.proto.CommandAck;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -87,42 +81,14 @@ import org.testng.annotations.Test;
public class TransactionEndToEndTest extends TransactionTestBase {
private static final int TOPIC_PARTITION = 3;
-
- private static final String TENANT = "tnx";
- private static final String NAMESPACE1 = TENANT + "/ns1";
private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 +
"/message-ack-test";
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
- setBrokerCount(1);
- internalSetup();
-
- String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
- String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
- admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
- admin.tenants().createTenant(TENANT,
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
- admin.topics().createPartitionedTopic(TOPIC_OUTPUT, TOPIC_PARTITION);
+ setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
-
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
NUM_PARTITIONS);
-
- if (pulsarClient != null) {
- pulsarClient.close();
- }
- pulsarClient = PulsarClient.builder()
-
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
- .statsInterval(0, TimeUnit.SECONDS)
- .enableTransaction(true)
- .build();
-
- // wait tc init success to ready state
- waitForCoordinatorToBeAvailable(NUM_PARTITIONS); }
+ }
@AfterMethod(alwaysRun = true)
protected void cleanup() {