This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4366a13ae2f [fix][broker][branch-3.1] Fix broker not starting when
both transactions and the Extensible Load Manager are enabled (#22194)
4366a13ae2f is described below
commit 4366a13ae2fa96db14dccc0767a81eb3f01af51f
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Mar 5 15:10:32 2024 -0800
[fix][broker][branch-3.1] Fix broker not starting when both transactions
and the Extensible Load Manager are enabled (#22194)
---
.../service/persistent/PersistentSubscription.java | 4 +-
.../broker/service/persistent/PersistentTopic.java | 4 +-
.../ExtensibleLoadManagerImplBaseTest.java | 160 +++++++++++++++++++++
.../extensions/ExtensibleLoadManagerImplTest.java | 123 +---------------
...dManagerImplWithTransactionCoordinatorTest.java | 55 +++++++
.../integration/messaging/MessagingSmokeTest.java | 107 ++++++++++++++
.../integration/topologies/PulsarCluster.java | 2 +-
.../topologies/PulsarClusterTestBase.java | 15 ++
.../src/test/resources/pulsar-messaging.xml | 1 +
9 files changed, 350 insertions(+), 121 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index b5852610c20..f00c95f7e68 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -58,6 +58,7 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -155,7 +156,8 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() :
Collections.unmodifiableMap(subscriptionProperties);
if
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
- && !isEventSystemTopic(TopicName.get(topicName))) {
+ && !isEventSystemTopic(TopicName.get(topicName))
+ && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
this.pendingAckHandle = new PendingAckHandleImpl(this);
} else {
this.pendingAckHandle = new PendingAckHandleDisabled();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2752f247853..b434d8a2dbc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -83,6 +83,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -306,7 +307,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
TopicName topicName = TopicName.get(topic);
if
(brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(topicName)
- &&
!NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+ &&
!NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
+ && !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
new file mode 100644
index 00000000000..d9c6f78b8d0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+public abstract class ExtensibleLoadManagerImplBaseTest extends
MockedPulsarServiceBaseTest {
+
+ protected PulsarService pulsar1;
+ protected PulsarService pulsar2;
+
+ protected PulsarTestContext additionalPulsarTestContext;
+
+ protected ExtensibleLoadManagerImpl primaryLoadManager;
+
+ protected ExtensibleLoadManagerImpl secondaryLoadManager;
+
+ protected ServiceUnitStateChannelImpl channel1;
+ protected ServiceUnitStateChannelImpl channel2;
+
+ protected final String defaultTestNamespace;
+
+ protected LookupService lookupService;
+
+ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
+ this.defaultTestNamespace = defaultTestNamespace;
+ }
+
+ protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
+ conf.setForceDeleteNamespaceAllowed(true);
+ conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ conf.setAllowAutoTopicCreation(true);
+
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ conf.setLoadBalancerSheddingEnabled(false);
+ conf.setLoadBalancerDebugModeEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ return conf;
+ }
+
+ @Override
+ @BeforeClass(alwaysRun = true)
+ protected void setup() throws Exception {
+ initConfig(conf);
+ super.internalSetup(conf);
+ pulsar1 = pulsar;
+ var conf2 = initConfig(getDefaultConf());
+ additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
+ pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+ setPrimaryLoadManager();
+ setSecondaryLoadManager();
+
+ admin.clusters().createCluster(this.conf.getClusterName(),
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+ Sets.newHashSet(this.conf.getClusterName())));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
+ Sets.newHashSet(this.conf.getClusterName()));
+
+ admin.namespaces().createNamespace(defaultTestNamespace, 128);
+
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+ Sets.newHashSet(this.conf.getClusterName()));
+ lookupService = (LookupService)
FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ this.additionalPulsarTestContext.close();
+ super.internalCleanup();
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ protected void initializeState() throws PulsarAdminException,
IllegalAccessException {
+ admin.namespaces().unload(defaultTestNamespace);
+ reset(primaryLoadManager, secondaryLoadManager);
+ FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService,
true);
+ }
+
+ protected void setPrimaryLoadManager() throws IllegalAccessException {
+ ExtensibleLoadManagerWrapper wrapper =
+ (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
+ primaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper, "loadManager", true));
+ FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager,
true);
+ channel1 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(primaryLoadManager,
"serviceUnitStateChannel", true);
+ }
+
+ private void setSecondaryLoadManager() throws IllegalAccessException {
+ ExtensibleLoadManagerWrapper wrapper =
+ (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
+ secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper, "loadManager", true));
+ FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager,
true);
+ channel2 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(secondaryLoadManager,
"serviceUnitStateChannel", true);
+ }
+
+ protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService
pulsar, TopicName topic) {
+ return pulsar.getNamespaceService().getBundleAsync(topic);
+ }
+
+ protected Pair<TopicName, NamespaceBundle>
getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
+ throws Exception {
+ TopicName changeEventsTopicName =
+ TopicName.get(defaultTestNamespace + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1,
changeEventsTopicName).get();
+ int i = 0;
+ while(true) {
+ TopicName topicName = TopicName.get(defaultTestNamespace + "/" +
topicNamePrefix + "-" + i);
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ if (!bundle.equals(changeEventsBundle)) {
+ return Pair.of(topicName, bundle);
+ }
+ i++;
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 850bf9a96c8..61621d14de6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -75,7 +75,6 @@ import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
@@ -101,21 +100,16 @@ import
org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
-import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -123,9 +117,6 @@ import
org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.mockito.MockedStatic;
import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
@@ -133,75 +124,11 @@ import org.testng.annotations.Test;
*/
@Slf4j
@Test(groups = "broker")
-public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest
{
+@SuppressWarnings("unchecked")
+public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBaseTest {
- private PulsarService pulsar1;
- private PulsarService pulsar2;
-
- private PulsarTestContext additionalPulsarTestContext;
-
- private ExtensibleLoadManagerImpl primaryLoadManager;
-
- private ExtensibleLoadManagerImpl secondaryLoadManager;
-
- private ServiceUnitStateChannelImpl channel1;
- private ServiceUnitStateChannelImpl channel2;
-
- private final String defaultTestNamespace = "public/test";
-
- private static void initConfig(ServiceConfiguration conf){
- conf.setForceDeleteNamespaceAllowed(true);
- conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- conf.setAllowAutoTopicCreation(true);
-
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- conf.setLoadBalancerSheddingEnabled(false);
- conf.setLoadBalancerDebugModeEnabled(true);
- conf.setTopicLevelPoliciesEnabled(true);
- }
-
- @BeforeClass
- @Override
- public void setup() throws Exception {
- // Set the inflight state waiting time and ownership monitor delay
time to 5 seconds to avoid
- // stuck when doing unload.
- initConfig(conf);
- super.internalSetup(conf);
- pulsar1 = pulsar;
- ServiceConfiguration defaultConf = getDefaultConf();
- initConfig(defaultConf);
- additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
- pulsar2 = additionalPulsarTestContext.getPulsarService();
-
- setPrimaryLoadManager();
-
- setSecondaryLoadManager();
-
- admin.clusters().createCluster(this.conf.getClusterName(),
-
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
- Sets.newHashSet(this.conf.getClusterName())));
- admin.namespaces().createNamespace("public/default");
- admin.namespaces().setNamespaceReplicationClusters("public/default",
- Sets.newHashSet(this.conf.getClusterName()));
-
- admin.namespaces().createNamespace(defaultTestNamespace);
-
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
- Sets.newHashSet(this.conf.getClusterName()));
- }
-
- @Override
- @AfterClass(alwaysRun = true)
- protected void cleanup() throws Exception {
- this.additionalPulsarTestContext.close();
- super.internalCleanup();
- }
-
- @BeforeMethod(alwaysRun = true)
- protected void initializeState() throws PulsarAdminException {
- admin.namespaces().unload(defaultTestNamespace);
- reset(primaryLoadManager, secondaryLoadManager);
+ public ExtensibleLoadManagerImplTest() {
+ super("public/test");
}
@Test
@@ -459,7 +386,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception
{
String namespace = defaultTestNamespace;
String topic = "persistent://" + namespace +
"/test-split-with-specific-position";
- admin.topics().createPartitionedTopic(topic, 10);
+ admin.topics().createPartitionedTopic(topic, 1024);
BundlesData bundles = admin.namespaces().getBundles(namespace);
int numBundles = bundles.getNumBundles();
@@ -1320,44 +1247,4 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
}
}
-
- private void setPrimaryLoadManager() throws IllegalAccessException {
- ExtensibleLoadManagerWrapper wrapper =
- (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
- primaryLoadManager = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(wrapper, "loadManager", true));
- FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager,
true);
- channel1 = (ServiceUnitStateChannelImpl)
- FieldUtils.readField(primaryLoadManager,
"serviceUnitStateChannel", true);
- }
-
- private void setSecondaryLoadManager() throws IllegalAccessException {
- ExtensibleLoadManagerWrapper wrapper =
- (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
- secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(wrapper, "loadManager", true));
- FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager,
true);
- channel2 = (ServiceUnitStateChannelImpl)
- FieldUtils.readField(secondaryLoadManager,
"serviceUnitStateChannel", true);
- }
-
- private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService
pulsar, TopicName topic) {
- return pulsar.getNamespaceService().getBundleAsync(topic);
- }
-
- private Pair<TopicName, NamespaceBundle>
getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
- throws Exception {
- TopicName changeEventsTopicName =
- TopicName.get(defaultTestNamespace + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
- NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1,
changeEventsTopicName).get();
- int i = 0;
- while (true) {
- TopicName topicName = TopicName.get(defaultTestNamespace + "/" +
topicNamePrefix + "-" + i);
- NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
- if (!bundle.equals(changeEventsBundle)) {
- return Pair.of(topicName, bundle);
- }
- i++;
- }
- }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
new file mode 100644
index 00000000000..0c95dd85f28
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ExtensibleLoadManagerImplWithTransactionCoordinatorTest extends
ExtensibleLoadManagerImplBaseTest {
+
+ public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() {
+ super("public/test-elb-with-tx");
+ }
+
+ @Override
+ protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
+ conf = super.initConfig(conf);
+ conf.setTransactionCoordinatorEnabled(true);
+ return conf;
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testUnloadAdminAPI() throws Exception {
+ var topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-unload");
+ var topicName = topicAndBundle.getLeft();
+ var bundle = topicAndBundle.getRight();
+
+ var srcBroker = admin.lookups().lookupTopic(topicName.toString());
+ var dstBroker = srcBroker.equals(pulsar1.getBrokerServiceUrl()) ?
pulsar2 : pulsar1;
+ var dstBrokerUrl = dstBroker.getBrokerId();
+ var dstBrokerServiceUrl = dstBroker.getBrokerServiceUrl();
+
+ admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(),
bundle.getBundleRange(), dstBrokerUrl);
+ Awaitility.await().untilAsserted(
+ () ->
assertEquals(admin.lookups().lookupTopic(topicName.toString()),
dstBrokerServiceUrl));
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
new file mode 100644
index 00000000000..618053ac000
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.testng.ITest;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+public class MessagingSmokeTest extends TopicMessagingBase implements ITest {
+
+ @Factory
+ public static Object[] messagingTests() {
+ List<?> tests = List.of(
+ new MessagingSmokeTest("Extensible Load Manager",
+ Map.of("loadManagerClassName",
ExtensibleLoadManagerImpl.class.getName(),
+ "loadBalancerLoadSheddingStrategy",
TransferShedder.class.getName())),
+ new MessagingSmokeTest("Extensible Load Manager with TX
Coordinator",
+ Map.of("loadManagerClassName",
ExtensibleLoadManagerImpl.class.getName(),
+ "loadBalancerLoadSheddingStrategy",
TransferShedder.class.getName(),
+ "transactionCoordinatorEnabled", "true"))
+ );
+ return tests.toArray();
+ }
+
+ private final String name;
+
+ public MessagingSmokeTest(String name, Map<String, String> brokerEnvs) {
+ super();
+ this.brokerEnvs.putAll(brokerEnvs);
+ this.name = name;
+ }
+
+ @Override
+ public String getTestName() {
+ return name;
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testNonPartitionedTopicMessagingWithExclusive(Supplier<String>
serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(),
TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testPartitionedTopicMessagingWithExclusive(Supplier<String>
serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(),
TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testNonPartitionedTopicMessagingWithFailover(Supplier<String>
serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(),
TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testPartitionedTopicMessagingWithFailover(Supplier<String>
serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(),
TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testNonPartitionedTopicMessagingWithShared(Supplier<String>
serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(),
TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testPartitionedTopicMessagingWithShared(Supplier<String>
serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ partitionedTopicSendAndReceiveWithShared(serviceUrl.get(),
TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testNonPartitionedTopicMessagingWithKeyShared(Supplier<String>
serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(),
TopicDomain.persistent.equals(topicDomain));
+ }
+
+ @Test(dataProvider = "serviceUrlAndTopicDomain")
+ public void testPartitionedTopicMessagingWithKeyShared(Supplier<String>
serviceUrl, TopicDomain topicDomain)
+ throws Exception {
+ partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(),
TopicDomain.persistent.equals(topicDomain));
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index e6a425956cf..8a759faf76b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -133,7 +133,7 @@ public class PulsarCluster {
this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap();
- this.proxyContainer = new
ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME,
spec.enableTls)
+ this.proxyContainer = new ProxyContainer(clusterName,
appendClusterName(ProxyContainer.NAME), spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName("pulsar-proxy"))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index ae9e44fa982..93e2221ab24 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.annotations.DataProvider;
import java.util.stream.Stream;
@@ -86,6 +87,20 @@ public abstract class PulsarClusterTestBase extends
PulsarTestBase {
};
}
+ @DataProvider
+ public Object[][] serviceUrlAndTopicDomain() {
+ return new Object[][] {
+ {
+ stringSupplier(() ->
getPulsarCluster().getPlainTextServiceUrl()),
+ TopicDomain.persistent
+ },
+ {
+ stringSupplier(() ->
getPulsarCluster().getPlainTextServiceUrl()),
+ TopicDomain.non_persistent
+ },
+ };
+ }
+
protected PulsarAdmin pulsarAdmin;
protected PulsarCluster pulsarCluster;
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml
b/tests/integration/src/test/resources/pulsar-messaging.xml
index cfbdb225870..c6cd900d791 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -28,6 +28,7 @@
<class
name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
<class
name="org.apache.pulsar.tests.integration.messaging.ReaderMessagingTest" />
<class
name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest"
/>
+ <class
name="org.apache.pulsar.tests.integration.messaging.MessagingSmokeTest" />
<class name="org.apache.pulsar.tests.integration.admin.AdminTest"
/>
</classes>
</test>