This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 272c239cd4b [fix][test] Reduce admin client churn in
ExtensibleLoadManagerTest.startBroker (#25676)
272c239cd4b is described below
commit 272c239cd4bafc4f2dba9d4bb5c273cf61c3a2cb
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 14:44:59 2026 -0700
[fix][test] Reduce admin client churn in
ExtensibleLoadManagerTest.startBroker (#25676)
---
.../loadbalance/ExtensibleLoadManagerTest.java | 64 +++++++++++++---------
1 file changed, 39 insertions(+), 25 deletions(-)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 38cda507fac..fa0ba2ef9b1 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -134,7 +134,7 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
@BeforeMethod(alwaysRun = true)
- public void startBroker() {
+ public void startBroker() throws Exception {
if (pulsarCluster == null) {
return;
}
@@ -144,35 +144,49 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
}
});
String topicName = "persistent://" + DEFAULT_NAMESPACE +
"/startBrokerCheck";
- Awaitility.await().atMost(180, TimeUnit.SECONDS).until(
- () -> {
- for (BrokerContainer brokerContainer :
pulsarCluster.getBrokers()) {
- try (PulsarAdmin brokerAdmin =
PulsarAdmin.builder().serviceHttpUrl(
- brokerContainer.getHttpServiceUrl()).build()) {
- if
(brokerAdmin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
- log.info()
+ // Build admin clients once and reuse across poll iterations to avoid
the per-tick
+ // connection churn (3 brokers x N polls of admin builder/close).
Connection setup
+ // contends with brokers that are still warming up after a
stop/restart.
+ List<BrokerContainer> brokers = new
ArrayList<>(pulsarCluster.getBrokers());
+ List<PulsarAdmin> brokerAdmins = new ArrayList<>(brokers.size());
+ try {
+ for (BrokerContainer brokerContainer : brokers) {
+ brokerAdmins.add(PulsarAdmin.builder()
+
.serviceHttpUrl(brokerContainer.getHttpServiceUrl()).build());
+ }
+ Awaitility.await().atMost(180, TimeUnit.SECONDS).until(
+ () -> {
+ for (int i = 0; i < brokers.size(); i++) {
+ BrokerContainer brokerContainer = brokers.get(i);
+ PulsarAdmin brokerAdmin = brokerAdmins.get(i);
+ try {
+ if
(brokerAdmin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
+ log.info()
+ .attr("broker",
brokerContainer.getHostName())
+ .attr("see", NUM_BROKERS)
+ .log("Broker does not see active
brokers yet");
+ return false;
+ }
+ try {
+
brokerAdmin.topics().createPartitionedTopic(topicName, 10);
+ } catch
(PulsarAdminException.ConflictException e) {
+ // expected - topic already exists
+ }
+
brokerAdmin.lookups().lookupPartitionedTopic(topicName);
+ } catch (Exception e) {
+ log.warn()
.attr("broker",
brokerContainer.getHostName())
- .attr("see", NUM_BROKERS)
- .log("Broker does not see active
brokers yet");
+ .attr("yet", e.getMessage())
+ .log("Broker is not ready yet");
return false;
}
- try {
-
brokerAdmin.topics().createPartitionedTopic(topicName, 10);
- } catch (PulsarAdminException.ConflictException e)
{
- // expected - topic already exists
- }
-
brokerAdmin.lookups().lookupPartitionedTopic(topicName);
- } catch (Exception e) {
- log.warn()
- .attr("broker",
brokerContainer.getHostName())
- .attr("yet", e.getMessage())
- .log("Broker is not ready yet");
- return false;
}
+ return true;
}
- return true;
- }
- );
+ );
+ } finally {
+ brokerAdmins.forEach(PulsarAdmin::close);
+ }
}
@Test(timeOut = 40 * 1000)