This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8cd2f38309e [cleanup][test] Remove PortManager and use kernel-assigned
ports everywhere (#25694)
8cd2f38309e is described below
commit 8cd2f38309ea46f932a40868f730360976b0b96c
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 17:44:10 2026 -0700
[cleanup][test] Remove PortManager and use kernel-assigned ports everywhere
(#25694)
---
.../mledger/impl/ManagedLedgerBkTest.java | 4 +-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 13 +--
.../java/org/apache/pulsar/PulsarStandalone.java | 16 +--
.../org/apache/pulsar/PulsarStandaloneBuilder.java | 5 -
.../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 48 ++------
.../org/apache/pulsar/PulsarStandaloneTest.java | 7 +-
.../apache/pulsar/broker/MultiBrokerBaseTest.java | 40 +------
.../apache/pulsar/broker/SLAMonitoringTest.java | 2 +-
.../broker/admin/GetPartitionMetadataTest.java | 2 +-
.../AbstractBrokerEntryCacheMultiBrokerTest.java | 5 -
.../loadbalance/AdvertisedListenersTest.java | 2 +
.../loadbalance/LeaderElectionServiceTest.java | 2 +-
.../broker/loadbalance/LoadBalancerTest.java | 2 +-
.../broker/loadbalance/SimpleBrokerStartTest.java | 4 +-
.../loadbalance/SimpleLoadManagerImplTest.java | 2 +-
.../extensions/BrokerRegistryIntegrationTest.java | 2 +-
.../loadbalance/extensions/BrokerRegistryTest.java | 2 +-
.../extensions/ExtensibleLoadManagerCloseTest.java | 2 +-
...LoadManagerImplWithAdvertisedListenersTest.java | 8 +-
.../extensions/LoadManagerFailFastTest.java | 6 +-
.../loadbalance/impl/BundleSplitterTaskTest.java | 2 +-
.../impl/ModularLoadManagerImplTest.java | 43 ++++---
.../protocol/PulsarClientBasedHandlerTest.java | 6 +-
.../protocol/SimpleProtocolHandlerTestsBase.java | 6 +-
.../broker/service/AdvertisedAddressTest.java | 2 +-
.../broker/service/BacklogQuotaManagerTest.java | 2 +-
.../pulsar/broker/service/BkEnsemblesTestBase.java | 2 +-
.../broker/service/BrokerBookieIsolationTest.java | 2 +-
.../service/BrokerEventLoopShutdownTest.java | 2 +-
.../CanReconnectZKClientPulsarServiceBaseTest.java | 2 +-
...eoReplicationWithConfigurationSyncTestBase.java | 4 +-
.../pulsar/broker/service/MaxMessageSizeTest.java | 2 +-
.../broker/service/NetworkErrorTestBase.java | 2 +-
.../broker/service/OneWayReplicatorTestBase.java | 4 +-
.../pulsar/broker/service/ReplicatorTestBase.java | 8 +-
.../pulsar/broker/service/TopicOwnerTest.java | 2 +-
.../service/persistent/ShadowTopicRealBkTest.java | 7 +-
.../broker/testcontext/PulsarTestContext.java | 5 +
.../coordinator/TransactionMetaStoreTestBase.java | 2 +-
.../client/api/ClientDeduplicationFailureTest.java | 2 +-
.../client/api/HybridTypesAcknowledgeTest.java | 2 +-
.../pulsar/client/api/NonPersistentTopicTest.java | 6 +-
.../client/impl/ServiceUrlQuarantineTest.java | 18 +--
.../worker/PulsarFunctionE2ESecurityTest.java | 2 +-
.../worker/PulsarFunctionLocalRunTest.java | 2 +-
.../worker/PulsarFunctionPublishTest.java | 2 +-
.../functions/worker/PulsarFunctionTlsTest.java | 20 ++--
.../worker/PulsarWorkerAssignmentTest.java | 2 +-
.../apache/pulsar/io/AbstractPulsarE2ETest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 2 +-
.../zookeeper/LocalBookkeeperEnsembleTest.java | 5 +-
.../org/apache/pulsar/common/util/PortManager.java | 18 +--
.../apache/pulsar/common/util/PortManagerTest.java | 38 +++++--
.../pulsar/metadata/bookkeeper/BKCluster.java | 50 ++------
.../replication/BookKeeperClusterTestCase.java | 13 +--
.../bookkeeper/test/BookKeeperClusterTestCase.java | 13 +--
.../bookkeeper/bookkeeper/test/PortManager.java | 126 ---------------------
.../extensions/SimpleProxyExtensionTestBase.java | 6 +-
59 files changed, 194 insertions(+), 416 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 236f596c83c..23ad9af3ad0 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
@@ -134,7 +133,7 @@ public class ManagedLedgerBkTest extends
BookKeeperClusterTestCase {
metadataStore.unsetAlwaysFail();
bkc = new PulsarBookKeeperTestClient(baseClientConf);
- int port = startNewBookie();
+ startNewBookie();
// Reconnect a new bk client
factory.shutdown();
@@ -164,7 +163,6 @@ public class ManagedLedgerBkTest extends
BookKeeperClusterTestCase {
assertEquals("entry-2", new String(entries.get(0).getData()));
entries.forEach(Entry::release);
factory.shutdown();
- releaseLockedPort(port);
}
@Test
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 78891cdd7a4..7e5486eac29 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -25,7 +25,6 @@
package org.apache.bookkeeper.test;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;
import com.google.common.base.Stopwatch;
import java.io.File;
@@ -296,14 +295,10 @@ public abstract class BookKeeperClusterTestCase {
protected ServerConfiguration newServerConfiguration() throws Exception {
File f = tmpDirs.createNew("bookie", "test");
-
- int port;
- if (baseConf.isEnableLocalTransport() ||
!baseConf.getAllowEphemeralPorts()) {
- port = nextLockedFreePort();
- } else {
- port = 0;
- }
- return newServerConfiguration(port, f, new File[] { f });
+ // Bookies need a pre-allocated port: BK identifies them by host:port
in metadata
+ // and the test client resolves that back to a TCP address. Port 0
would leave
+ // the cookie + registration with port=0, which fails DNS-style
resolution.
+ return newServerConfiguration(PortManager.nextLockedFreePort(), f, new
File[] { f });
}
protected ClientConfiguration newClientConfiguration() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 0f4166be922..b6731263079 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -84,10 +84,6 @@ public class PulsarStandalone implements AutoCloseable {
this.bkEnsemble = bkEnsemble;
}
- public void setBkPort(int bkPort) {
- this.bkPort = bkPort;
- }
-
public void setBkDir(String bkDir) {
this.bkDir = bkDir;
}
@@ -172,10 +168,6 @@ public class PulsarStandalone implements AutoCloseable {
return zkPort;
}
- public int getBkPort() {
- return bkPort;
- }
-
public String getZkDir() {
return zkDir;
}
@@ -237,9 +229,6 @@ public class PulsarStandalone implements AutoCloseable {
hidden = true)
private int zkPort = 2181;
- @Option(names = { "--bookkeeper-port" }, description = "Local bookies base
port")
- private int bkPort = 3181;
-
@Option(names = { "--zookeeper-dir" },
description = "Local zooKeeper's data directory",
hidden = true)
@@ -470,7 +459,6 @@ public class PulsarStandalone implements AutoCloseable {
bkCluster = BKCluster.builder()
.baseServerConfiguration(bkServerConf)
.metadataServiceUri(metadataStoreUrl)
- .bkPort(bkPort)
.numBookies(numOfBk)
.dataDir(bkDir)
.clearOldData(wipeData)
@@ -484,9 +472,9 @@ public class PulsarStandalone implements AutoCloseable {
ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());
calculateCacheSize(bkServerConf);
- // Start LocalBookKeeper
+ // Start LocalBookKeeper. Bookies bind to kernel-assigned ports.
bkEnsemble = new LocalBookkeeperEnsemble(
- this.getNumOfBk(), this.getZkPort(), this.getBkPort(),
this.getStreamStoragePort(), this.getZkDir(),
+ this.getNumOfBk(), this.getZkPort(),
this.getStreamStoragePort(), this.getZkDir(),
this.getBkDir(), this.isWipeData(), "127.0.0.1");
bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
index 5469d247829..2cf91c564d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
@@ -56,11 +56,6 @@ public final class PulsarStandaloneBuilder {
return this;
}
- public PulsarStandaloneBuilder withBkPort(int bkPort) {
- pulsarStandalone.setBkPort(bkPort);
- return this;
- }
-
public PulsarStandaloneBuilder withZkDir(String zkDir) {
pulsarStandalone.setZkDir(zkDir);
return this;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index d8c67568304..2f68928d6d6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import lombok.CustomLog;
@@ -91,46 +90,18 @@ public class LocalBookkeeperEnsemble {
int numberOfBookies;
private final boolean clearOldData;
- private static class BasePortManager implements Supplier<Integer> {
-
- private int port;
-
- public BasePortManager(int basePort) {
- this.port = basePort;
- }
-
- @Override
- public synchronized Integer get() {
- return port++;
- }
+ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort) {
+ this(numberOfBookies, zkPort, 4181, null, null, true, null);
}
- private final Supplier<Integer> portManager;
-
- public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort,
Supplier<Integer> portManager) {
- this(numberOfBookies, zkPort, 4181, null, null, true, null,
portManager);
- }
-
- public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int
bkBasePort, String zkDataDirName,
+ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String
zkDataDirName,
String bkDataDirName, boolean clearOldData) {
- this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName,
bkDataDirName, clearOldData, null);
+ this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName,
clearOldData, null);
}
- public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int
bkBasePort, String zkDataDirName,
+ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String
zkDataDirName,
String bkDataDirName, boolean clearOldData, String
advertisedAddress) {
- this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName,
bkDataDirName, clearOldData, advertisedAddress);
- }
-
- public LocalBookkeeperEnsemble(int numberOfBookies,
- int zkPort,
- int bkBasePort,
- int streamStoragePort,
- String zkDataDirName,
- String bkDataDirName,
- boolean clearOldData,
- String advertisedAddress) {
- this(numberOfBookies, zkPort, streamStoragePort, zkDataDirName,
bkDataDirName, clearOldData, advertisedAddress,
- bkBasePort != 0 ? new BasePortManager(bkBasePort) : () -> 0);
+ this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName,
clearOldData, advertisedAddress);
}
public LocalBookkeeperEnsemble(int numberOfBookies,
@@ -139,10 +110,8 @@ public class LocalBookkeeperEnsemble {
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
- String advertisedAddress,
- Supplier<Integer> portManager) {
+ String advertisedAddress) {
this.numberOfBookies = numberOfBookies;
- this.portManager = portManager;
this.streamStoragePort = streamStoragePort;
this.zkDataDirName = zkDataDirName;
this.bkDataDirName = bkDataDirName;
@@ -301,7 +270,8 @@ public class LocalBookkeeperEnsemble {
cleanDirectory(bkDataDir);
}
- int bookiePort = portManager.get();
+ // Bookies bind to a kernel-assigned port; identity is established
via bookieId.
+ int bookiePort = 0;
String bookieId = "bk" + i + "test";
// Ensure registration Z-nodes are cleared when standalone service
is restarted ungracefully
deleteBookieRegistrationZnode(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
index 77c1aced14d..21b4fe43de2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
@@ -54,7 +54,6 @@ public class PulsarStandaloneTest {
PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
standalone.setBkDir(tempDir.getAbsolutePath());
- standalone.setBkPort(0);
standalone.setNumOfBk(bookieNum);
standalone.startBookieWithMetadataStore();
@@ -67,10 +66,12 @@ public class PulsarStandaloneTest {
List<ServerConfiguration> secondBsConfs =
standalone.bkCluster.getBsConfs();
Assert.assertEquals(secondBsConfs.size(), bookieNum);
+ // Cookies must be preserved across restart (otherwise bookie startup
would have failed
+ // with InvalidCookieException). The bookieId is the persistent
identity.
for (int i = 0; i < bookieNum; i++) {
ServerConfiguration conf1 = firstBsConfs.get(i);
ServerConfiguration conf2 = secondBsConfs.get(i);
- Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort());
+ Assert.assertEquals(conf1.getBookieId(), conf2.getBookieId());
}
standalone.close();
cleanDirectory(tempDir);
@@ -93,7 +94,6 @@ public class PulsarStandaloneTest {
}
final File bkDir = IOUtils.createTempDir("standalone", "bk");
standalone.setNumOfBk(1);
- standalone.setBkPort(0);
standalone.setBkDir(bkDir.getAbsolutePath());
standalone.start();
@@ -148,7 +148,6 @@ public class PulsarStandaloneTest {
bkDir.getAbsolutePath()
});
standalone.setTestMode(true);
- standalone.setBkPort(0);
standalone.start();
BKCluster bkCluster = standalone.bkCluster;
standalone.runShutdownHook();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index 076d649d5d9..203b8668288 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -22,7 +22,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import lombok.CustomLog;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -35,7 +34,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
-import org.apache.pulsar.common.util.PortManager;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -46,6 +44,7 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
protected List<PulsarAdmin> additionalBrokerAdmins;
protected List<PulsarClient> additionalBrokerClients;
protected PulsarMockBookKeeper mockBookKeeper;
+ // Populated after broker startup with kernel-assigned ports.
protected int mainBrokerPort;
protected List<Integer> additionalBrokerPorts = new ArrayList<>();
@@ -53,17 +52,10 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
return 2;
}
- protected boolean useDynamicBrokerPorts() {
- return true;
- }
-
@BeforeClass(alwaysRun = true)
@Override
public final void setup() throws Exception {
beforeSetup();
- if (!useDynamicBrokerPorts()) {
- mainBrokerPort = PortManager.nextLockedFreePort();
- }
OrderedExecutor mockBookKeeperExecutor =
OrderedExecutor.newBuilder().numThreads(1)
.name(MultiBrokerBaseTest.class.getSimpleName() +
"-bk-executor").build();
registerCloseable(() -> GracefulExecutorServicesShutdown.initiate()
@@ -75,6 +67,7 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
((NonClosableMockBookKeeper) mockBookKeeper).reallyShutdown();
});
super.internalSetup();
+ mainBrokerPort = pulsar.getBrokerListenPort().orElse(0);
additionalBrokersSetup();
pulsarResourcesSetup();
additionalSetup();
@@ -89,14 +82,6 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
pulsarTestContextBuilder.bookKeeperClient(mockBookKeeper);
}
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
- if (!useDynamicBrokerPorts()) {
- conf.setBrokerServicePort(Optional.of(mainBrokerPort));
- }
- }
-
protected void additionalSetup() throws Exception {
// override this method to add any additional setup logic
@@ -115,17 +100,12 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers);
additionalPulsarTestContexts = new
ArrayList<>(numberOfAdditionalBrokers);
additionalBrokerPorts = new ArrayList<>(numberOfAdditionalBrokers);
- if (!useDynamicBrokerPorts()) {
- for (int i = 0; i < numberOfAdditionalBrokers; i++) {
- int port = PortManager.nextLockedFreePort();
- additionalBrokerPorts.add(port);
- }
- }
for (int i = 0; i < numberOfAdditionalBrokers; i++) {
PulsarTestContext pulsarTestContext = createAdditionalBroker(i);
additionalPulsarTestContexts.add(i, pulsarTestContext);
PulsarService pulsarService = pulsarTestContext.getPulsarService();
additionalBrokers.add(i, pulsarService);
+
additionalBrokerPorts.add(pulsarService.getBrokerListenPort().orElse(0));
PulsarAdminBuilder pulsarAdminBuilder =
PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() !=
null
? pulsarService.getWebServiceAddress()
@@ -159,9 +139,6 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
protected PulsarTestContext createAdditionalBroker(int
additionalBrokerIndex) throws Exception {
ServiceConfiguration conf =
createConfForAdditionalBroker(additionalBrokerIndex);
- if (!useDynamicBrokerPorts()) {
-
conf.setBrokerServicePort(Optional.of(additionalBrokerPorts.get(additionalBrokerIndex)));
- }
return createAdditionalPulsarTestContext(conf);
}
@@ -175,14 +152,6 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
log.warn().exception(e).log("Exception during additional cleanup");
}
super.internalCleanup();
- if (!useDynamicBrokerPorts()) {
- if (mainBrokerPort > 0) {
- PortManager.releaseLockedPort(mainBrokerPort);
- }
- for (Integer port : additionalBrokerPorts) {
- PortManager.releaseLockedPort(port);
- }
- }
}
protected void additionalCleanup() throws Exception {
@@ -212,9 +181,6 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
try {
pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsarTestContext.close();
-
pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
-
pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort);
-
pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
} catch (Exception e) {
log.warn().exception(e).log("Failed to stop additional
broker");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 523006e9d74..3426753fbc6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -71,7 +71,7 @@ public class SLAMonitoringTest {
new LinkedBlockingQueue<>());
log.info("---- Initializing SLAMonitoringTest -----");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// start brokers
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index 49e4c6b2103..b4dec430547 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -78,7 +78,7 @@ public class GetPartitionMetadataTest extends
TestRetrySupport {
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
incrementSetupNumber();
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker.
setupBrokers();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
index 48cc5c67993..3e2b0a4655c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
@@ -197,11 +197,6 @@ public abstract class
AbstractBrokerEntryCacheMultiBrokerTest extends MultiBroke
return 1;
}
- @Override
- protected boolean useDynamicBrokerPorts() {
- return false;
- }
-
@BeforeMethod(alwaysRun = true)
public final void doBeforeMethod() {
beforeMethod();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
index d019af35a69..64c0ee3f47a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -65,6 +65,8 @@ public class AdvertisedListenersTest extends
MultiBrokerBaseTest {
}
private void updateConfig(ServiceConfiguration conf, String
advertisedAddress) {
+ // Pre-allocate ports because the advertised listener URLs are baked
into config
+ // before the broker starts. The broker then binds to the same ports.
int pulsarPort = nextLockedFreePort();
int httpPort = nextLockedFreePort();
int httpsPort = nextLockedFreePort();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 62571cc3ff4..0bdeb4341d9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -52,7 +52,7 @@ public class LeaderElectionServiceTest {
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
log.info("---- bk started ----");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index c6617bbacaa..cf8db26a4a6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -107,7 +107,7 @@ public class LoadBalancerTest {
@BeforeMethod
void setup() throws Exception {
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(),
SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index 879afd00c1b..36bd4b1e0fc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -48,7 +48,7 @@ public class SimpleBrokerStartTest {
}
// Start local bookkeeper ensemble
@Cleanup("stop")
- LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0,
() -> 0);
+ LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker
ServiceConfiguration config = new ServiceConfiguration();
@@ -79,7 +79,7 @@ public class SimpleBrokerStartTest {
}
// Start local bookkeeper ensemble
@Cleanup("stop")
- LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0,
() -> 0);
+ LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker
ServiceConfiguration config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 6103b948236..20a5883d65d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -124,7 +124,7 @@ public class SimpleLoadManagerImplTest {
void setup() throws Exception {
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker 1
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
index 80e54278d31..138ad1dee11 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
@@ -47,7 +47,7 @@ public class BrokerRegistryIntegrationTest {
@BeforeClass
protected void setup() throws Exception {
- bk = new LocalBookkeeperEnsemble(2, 0, () -> 0);
+ bk = new LocalBookkeeperEnsemble(2, 0);
bk.start();
pulsar = new PulsarService(brokerConfig());
pulsar.start();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 0247a68256e..73b50b4603a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -169,7 +169,7 @@ public class BrokerRegistryTest {
executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
index 50265c28edc..f0734fbe8bb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -47,7 +47,7 @@ public class ExtensibleLoadManagerCloseTest {
@BeforeClass(alwaysRun = true)
public void setup() throws Exception {
- bk = new LocalBookkeeperEnsemble(1, 0, () -> 0);
+ bk = new LocalBookkeeperEnsemble(1, 0);
bk.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
index f393e585896..656088c2ba5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
@@ -18,12 +18,12 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import java.util.Optional;
import lombok.CustomLog;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.util.PortManager;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
@@ -46,8 +46,10 @@ public class
ExtensibleLoadManagerImplWithAdvertisedListenersTest extends Extens
@Override
protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
super.updateConfig(conf);
- int privatePulsarPort = nextLockedFreePort();
- int publicPulsarPort = nextLockedFreePort();
+ // Pre-allocate ports because advertised listener URLs are baked into
config
+ // before the broker starts.
+ int privatePulsarPort = PortManager.nextLockedFreePort();
+ int publicPulsarPort = PortManager.nextLockedFreePort();
conf.setInternalListenerName("internal");
conf.setBindAddresses("external:pulsar://localhost:" +
publicPulsarPort);
conf.setAdvertisedListeners(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
index a400bf733e5..1dc254492b7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
@@ -26,7 +26,6 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
-import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
@@ -38,8 +37,7 @@ import org.testng.annotations.Test;
public class LoadManagerFailFastTest {
private static final String cluster = "test";
- private final int zkPort = PortManager.nextLockedFreePort();
- private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
zkPort, PortManager::nextLockedFreePort);
+ private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
0);
private final ServiceConfiguration config = new ServiceConfiguration();
@BeforeClass
@@ -49,7 +47,7 @@ public class LoadManagerFailFastTest {
config.setAdvertisedAddress("localhost");
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:localhost:" + zkPort);
+ config.setMetadataStoreUrl("zk:localhost:" + bk.getZookeeperPort());
}
@AfterClass
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 7a012b78756..085bea1170e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -59,7 +59,7 @@ public class BundleSplitterTaskTest {
@BeforeMethod
void setup() throws Exception {
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker
ServiceConfiguration config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index c3b5a02e5ac..ecabd901116 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -173,7 +173,7 @@ public class ModularLoadManagerImplTest {
executor = new ThreadPoolExecutor(1, 20, 30, TimeUnit.SECONDS, new
LinkedBlockingQueue<>());
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker 1
@@ -947,25 +947,32 @@ public class ModularLoadManagerImplTest {
ServiceConfiguration config = new ServiceConfiguration();
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName("use");
-
config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort()));
- config.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
- config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- config.setBrokerServicePort(Optional.of(0));
- PulsarService pulsar = new PulsarService(config);
- // create znode using different zk-session
- final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/"
+ pulsar.getAdvertisedAddress() + ":"
- + config.getWebServicePort().get();
- pulsar1.getLocalMetadataStore()
- .put(brokerZnode, new byte[0], Optional.empty(),
EnumSet.of(CreateOption.Ephemeral)).join();
+ // Pre-allocate a port: the test creates a znode at the broker's
would-be address before
+ // starting the broker, so it needs to know the address up front.
+ int webPort = PortManager.nextLockedFreePort();
try {
- pulsar.start();
- fail("should have failed");
- } catch (PulsarServerException e) {
- //Ok.
- }
+ config.setWebServicePort(Optional.of(webPort));
+ config.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
+
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ config.setBrokerServicePort(Optional.of(0));
+ PulsarService pulsar = new PulsarService(config);
+ // create znode using different zk-session
+ final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT +
"/" + pulsar.getAdvertisedAddress() + ":"
+ + config.getWebServicePort().get();
+ pulsar1.getLocalMetadataStore()
+ .put(brokerZnode, new byte[0], Optional.empty(),
EnumSet.of(CreateOption.Ephemeral)).join();
+ try {
+ pulsar.start();
+ fail("should have failed");
+ } catch (PulsarServerException e) {
+ //Ok.
+ }
- pulsar.close();
+ pulsar.close();
+ } finally {
+ PortManager.releaseLockedPort(webPort);
+ }
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
index 411c57c3f12..0c6797c93fa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.protocol;
import java.io.File;
import java.util.Optional;
import lombok.CustomLog;
-import org.apache.bookkeeper.util.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -38,8 +37,7 @@ public class PulsarClientBasedHandlerTest {
private static final String clusterName = "cluster";
private static final int shutdownTimeoutMs = 100;
- private final int zkPort = PortManager.nextFreePort();
- private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
zkPort, PortManager::nextFreePort);
+ private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
0);
private File tempDirectory;
private PulsarService pulsar;
@@ -51,7 +49,7 @@ public class PulsarClientBasedHandlerTest {
config.setAdvertisedAddress("localhost");
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort());
tempDirectory =
SimpleProtocolHandlerTestsBase.configureProtocolHandler(config,
PulsarClientBasedHandler.class.getName(), true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
index 9e32f4be8be..f1cb937868a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.protocol;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
+import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
@@ -46,7 +47,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.common.util.PortManager;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -88,6 +88,8 @@ public abstract class SimpleProtocolHandlerTestsBase extends
BrokerTestBase {
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>>
newChannelInitializers() {
+ // Pre-allocate a free port: protocol handlers need to register a
listener at a known
+ // address before the broker calls back into them.
int port = nextLockedFreePort();
this.ports.add(port);
return Collections.singletonMap(new
InetSocketAddress(conf.getBindAddress(), port),
@@ -115,7 +117,7 @@ public abstract class SimpleProtocolHandlerTestsBase
extends BrokerTestBase {
@Override
public void close() {
- ports.removeIf(PortManager::releaseLockedPort);
+ ports.removeIf(p -> releaseLockedPort(p));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index 8dafe7a6af3..d76c7fb19d2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -41,7 +41,7 @@ public class AdvertisedAddressTest {
@BeforeMethod
public void setup() throws Exception {
- bkEnsemble = new LocalBookkeeperEnsemble(1, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(1, 0);
bkEnsemble.start();
ServiceConfiguration config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 9a85b65b1e7..cbc066bd99a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -138,7 +138,7 @@ public class BacklogQuotaManagerTest {
void setup() throws Exception {
try {
// start local bookie and zookeeper
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// start pulsar service
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index 4a1fcbb04db..2b01bbbb9d1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -86,7 +86,7 @@ public abstract class BkEnsemblesTestBase extends
TestRetrySupport {
incrementSetupNumber();
try {
// start local bookie and zookeeper
- bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () ->
0);
+ bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0);
bkEnsemble.start();
// start pulsar service
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 2403675253f..24be9695184 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -97,7 +97,7 @@ public class BrokerBookieIsolationTest {
@BeforeMethod
protected void setup() throws Exception {
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(4, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(4, 0);
bkEnsemble.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
index 29421f155b6..b2667102198 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
@@ -37,7 +37,7 @@ public class BrokerEventLoopShutdownTest {
@BeforeClass(alwaysRun = true)
public void setup() throws Exception {
- bk = new LocalBookkeeperEnsemble(2, 0, () -> 0);
+ bk = new LocalBookkeeperEnsemble(2, 0);
bk.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
index c0bd01c64a2..f9f0030c45f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -80,7 +80,7 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
brokerConfigZk.start();
// Start BK.
- bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0);
bkEnsemble.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
index 3b4bac53fec..728af789992 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
@@ -77,9 +77,9 @@ public abstract class
GeoReplicationWithConfigurationSyncTestBase extends TestRe
brokerConfigZk2.start();
// Start BK.
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble1.start();
- bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble2.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index 61e45462290..b4a7e4ef0ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -54,7 +54,7 @@ public class MaxMessageSizeTest {
@BeforeMethod
void setup() {
try {
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
ServerConfiguration conf = new ServerConfiguration();
conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024 + 10 * 1024);
bkEnsemble.startStandalone(conf, false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
index 0d5ae2a4c0d..de30a4c9d82 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
@@ -86,7 +86,7 @@ public abstract class NetworkErrorTestBase extends
TestRetrySupport {
protected void startZKAndBK() throws Exception {
// Start ZK & BK.
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble1.start();
metadataZKProxy = new Ipv4Proxy(getOneFreePort(), "127.0.0.1",
bkEnsemble1.getZookeeperPort());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index a73a2844f79..0aec5358be6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -114,9 +114,9 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
}
// Start BK.
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble1.start();
- bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble2.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 265d685223e..755230a8537 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -155,7 +155,7 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
globalZkS.start();
// Start region 1
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble1.start();
// NOTE: we have to instantiate a new copy of System.getProperties()
to make sure pulsar1 and pulsar2 have
@@ -174,7 +174,7 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
// Start region 2
// Start zk & bks
- bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble2.start();
setConfig2DefaultValue();
@@ -190,7 +190,7 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
// Start region 3
// Start zk & bks
- bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble3.start();
setConfig3DefaultValue();
@@ -206,7 +206,7 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
// Start region 4
// Start zk & bks
- bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble4.start();
setConfig4DefaultValue();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 809f9a5a6e0..cccc3ceeb52 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -71,7 +71,7 @@ public class TopicOwnerTest {
void setup() throws Exception {
log.info("---- Initializing TopicOwnerTest -----");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// start brokers
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
index e58b8cdcce7..9b52582f6f3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
@@ -41,10 +41,9 @@ import org.testng.annotations.Test;
public class ShadowTopicRealBkTest {
private static final String cluster = "test";
- // Pass 0 for both ZK and bookie ports so the kernel picks free ports at
bind time, avoiding
- // any JVM-vs-OS race on pre-allocated ports. The actual ZK port is read
back via
- // bk.getZookeeperPort().
- private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
0, () -> 0);
+ // ZK port 0 lets the kernel pick a free port at bind time. The actual
port is read back
+ // via bk.getZookeeperPort() after start. Bookies always bind to
kernel-assigned ports.
+ private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
0);
private PulsarService pulsar;
private PulsarAdmin admin;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index ff344ebadea..c7136fcb84a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -723,6 +723,11 @@ public class PulsarTestContext implements AutoCloseable {
protected void handlePreallocatePorts(ServiceConfiguration config) {
if (super.preallocatePorts) {
+ // Pre-allocate ports for callers that need the port number
BEFORE the broker
+ // starts (e.g. to build advertised-listener URLs).
PortManager hands out ports
+ // outside the ephemeral range, so the kernel won't
auto-assign them to other
+ // processes. Tests that don't need a pre-known port should
leave
+ // preallocatePorts=false and let the broker bind to 0
directly.
config.getBrokerServicePort().ifPresent(portNumber -> {
if (portNumber == 0) {
config.setBrokerServicePort(Optional.of(PortManager.nextLockedFreePort()));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 81321427a2b..904a4807ad3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -50,7 +50,7 @@ public abstract class TransactionMetaStoreTestBase extends
TestRetrySupport {
protected final void setup() throws Exception {
log.info().attr("class", getClass().getSimpleName()).log("----
Initializing -----");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
String[] args = new String[]{
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index 2602233293e..6f173aa2ba0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -76,7 +76,7 @@ public class ClientDeduplicationFailureTest {
log.info().attr("upMethod", method.getName()).log("--- Setting up
method");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java
index 3fa6750c7de..451a15ebe50 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java
@@ -73,7 +73,7 @@ public class HybridTypesAcknowledgeTest extends
TestRetrySupport {
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
incrementSetupNumber();
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker.
setupBrokers();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 97126457cd1..1552940cda4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -989,7 +989,7 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
globalZkS.start();
// Start region 1
- bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble1.start();
// NOTE: we have to instantiate a new copy of
System.getProperties() to make sure pulsar1 and pulsar2 have
@@ -1019,7 +1019,7 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
// Start region 2
// Start zk & bks
- bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble2.start();
config2 = new ServiceConfiguration();
@@ -1046,7 +1046,7 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
// Start region 3
// Start zk & bks
- bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble3.start();
config3 = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
index a1d31c0b9b5..db465e643ac 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
@@ -54,18 +54,21 @@ public class ServiceUrlQuarantineTest extends
ProducerConsumerBase {
private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine;
private int brokerServicePort;
private int webServicePort;
- private final Set<Integer> lockedFreePortSet = new HashSet<>();
+ private final Set<Integer> reservedPorts = new HashSet<>();
private static final int UNAVAILABLE_NODES = 20;
private static final int TIMEOUT_MS = 500;
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ // Pre-allocate ports for the broker because they must match the URL
the test builds below.
this.brokerServicePort = nextLockedFreePort();
this.webServicePort = nextLockedFreePort();
super.internalSetup();
super.producerBaseSetup();
- // Create a Pulsar client with some unavailable nodes
+ // Build a service URL that includes a bunch of unavailable-node
ports. PortManager
+ // hands out ports outside the ephemeral range, so nothing else is
going to grab them
+ // and the connection attempts to those addresses will fail as
expected.
StringBuilder binaryServiceUrlBuilder = new
StringBuilder(pulsar.getBrokerServiceUrl());
StringBuilder httpServiceUrlBuilder = new
StringBuilder(pulsar.getWebServiceAddress());
for (int i = 0; i < UNAVAILABLE_NODES; i++) {
@@ -98,9 +101,9 @@ public class ServiceUrlQuarantineTest extends
ProducerConsumerBase {
}
private int nextLockedFreePort() {
- int newLockedFreePort = PortManager.nextLockedFreePort();
- this.lockedFreePortSet.add(newLockedFreePort);
- return newLockedFreePort;
+ int port = PortManager.nextLockedFreePort();
+ reservedPorts.add(port);
+ return port;
}
@Override
@@ -133,9 +136,8 @@ public class ServiceUrlQuarantineTest extends
ProducerConsumerBase {
if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) {
pulsarClientWithHttpServiceUrlDisableQuarantine.close();
}
- for (Integer port : lockedFreePortSet) {
- PortManager.releaseLockedPort(port);
- }
+ reservedPorts.forEach(PortManager::releaseLockedPort);
+ reservedPorts.clear();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 3fbd4564a92..a39b1ef1b87 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -119,7 +119,7 @@ public class PulsarFunctionE2ESecurityTest {
log.info().attr("method", method.getName()).log("Setting up method");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 122a81d3779..b6952682fa1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -202,7 +202,7 @@ public class PulsarFunctionLocalRunTest {
log.info().attr("method", method.getName()).log("Setting up method");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 1bbabca3613..5450c0c17b3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -124,7 +124,7 @@ public class PulsarFunctionPublishTest {
log.info().attr("method", method.getName()).log("Setting up method");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 9f27350f73d..9727dfb8f41 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.worker;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -97,21 +96,22 @@ public class PulsarFunctionTlsTest {
void setup() throws Exception {
log.info("---- Initializing TopicOwnerTest -----");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
- int brokerPort = nextLockedFreePort();
- int webPort = nextLockedFreePort();
-
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setWebServicePort(Optional.empty());
- config.setWebServicePortTls(Optional.of(webPort));
+ // Pre-allocate the TLS web port:
PulsarService.initializeWorkerConfigFromBrokerConfig
+ // builds workerId = "c-{cluster}-fw-{host}-{port}" from the
CONFIGURED port. Two
+ // brokers configured with port 0 would end up with the same
workerId and the
+ // function-worker membership manager would never elect a leader.
+
config.setWebServicePortTls(Optional.of(PortManager.nextLockedFreePort()));
config.setBrokerServicePort(Optional.empty());
- config.setBrokerServicePortTls(Optional.of(brokerPort));
+ config.setBrokerServicePortTls(Optional.of(0));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
@@ -220,11 +220,9 @@ public class PulsarFunctionTlsTest {
}
for (int i = 0; i < BROKER_COUNT; i++) {
if (pulsarServices[i] != null) {
- pulsarServices[i].close();
- pulsarServices[i].getConfiguration().
-
getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
pulsarServices[i].getConfiguration()
-
.getWebServicePort().ifPresent(PortManager::releaseLockedPort);
+
.getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
+ pulsarServices[i].close();
pulsarServices[i] = null;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 7766b8a5054..ae91b671643 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -83,7 +83,7 @@ public class PulsarWorkerAssignmentTest {
log.info().attr("method", method.getName()).log("Setting up method");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index cc010b82d9b..53705e13be6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -114,7 +114,7 @@ public abstract class AbstractPulsarE2ETest {
log.info().attr("method", method.getName()).log("--- Setting up method
---");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 9e4b12f0a46..2bda831a999 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -93,7 +93,7 @@ public class PulsarFunctionAdminTest {
log.info().attr("method", method.getName()).log("Setting up method");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index aa78d3aaa91..f121566d037 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -110,7 +110,7 @@ public class PulsarFunctionTlsTest {
log.info().attr("method", method.getName()).log("Setting up method");
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
index 3bc42cf0368..65ad1e490c0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
@@ -42,7 +42,7 @@ public class LocalBookkeeperEnsembleTest {
final int numBk = 1;
// Start local Bookies/ZooKeepers and confirm that they are running at
specified ports
- LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk,
0, () -> 0);
+ LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk,
0);
ensemble.start();
assertTrue(ensemble.getZkServer().isRunning());
assertEquals(ensemble.getZkServer().getClientPort(),
ensemble.getZookeeperPort());
@@ -60,8 +60,7 @@ public class LocalBookkeeperEnsembleTest {
public void testStartWithSpecifiedStreamStoragePort() throws Exception {
LocalBookkeeperEnsemble ensemble = null;
try {
- ensemble =
- new LocalBookkeeperEnsemble(1, 0, 0, 4182, null, null,
true, null);
+ ensemble = new LocalBookkeeperEnsemble(1, 0, 4182, null, null,
true, null);
ensemble.startStandalone(new ServerConfiguration(), true);
} finally {
if (ensemble != null) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
index 612deb9e017..fab5104e8c3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
@@ -22,14 +22,18 @@ import java.net.ServerSocket;
import java.util.HashSet;
import java.util.Set;
+/**
+ * Allocates ports for tests that need a known port number BEFORE binding
(e.g. tests that build
+ * advertised-listener URLs, or that pre-create metadata znodes at the
broker's would-be address).
+ * For everything else, prefer binding to port 0 and reading the
kernel-assigned port back.
+ */
public class PortManager {
private static final Set<Integer> PORTS = new HashSet<>();
/**
- * Return a locked available port.
- *
- * @return locked available port.
+ * Return a free port that is reserved for the caller until {@link
#releaseLockedPort(int)}
+ * is invoked.
*/
public static synchronized int nextLockedFreePort() {
int exceptionCount = 0;
@@ -50,18 +54,16 @@ public class PortManager {
}
/**
- * Returns whether the port was released successfully.
+ * Release a previously locked port.
*
- * @return whether the release is successful.
+ * @return true if the port was previously locked by this manager
*/
public static synchronized boolean releaseLockedPort(int lockedPort) {
return PORTS.remove(lockedPort);
}
/**
- * Check port if locked.
- *
- * @return whether the port is locked.
+ * @return true if the port is currently locked by this manager
*/
public static synchronized boolean checkPortIfLocked(int lockedPort) {
return PORTS.contains(lockedPort);
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
index 408753300cc..b3f91617a5c 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
@@ -18,19 +18,41 @@
*/
package org.apache.pulsar.common.util;
-import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
-import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import org.testng.annotations.Test;
public class PortManagerTest {
+
+ @Test
+ public void allocatesAFreePort() {
+ int port = PortManager.nextLockedFreePort();
+ try {
+ assertTrue(port > 0);
+ assertTrue(PortManager.checkPortIfLocked(port));
+ } finally {
+ PortManager.releaseLockedPort(port);
+ }
+ }
+
+ @Test
+ public void allocatesDistinctPorts() {
+ int p1 = PortManager.nextLockedFreePort();
+ int p2 = PortManager.nextLockedFreePort();
+ try {
+ assertNotEquals(p1, p2);
+ } finally {
+ PortManager.releaseLockedPort(p1);
+ PortManager.releaseLockedPort(p2);
+ }
+ }
+
@Test
- public void testCheckPortIfLockedAndRemove() {
- int port = nextLockedFreePort();
- assertTrue(checkPortIfLocked(port));
- assertTrue(releaseLockedPort(port));
- assertFalse(checkPortIfLocked(port));
+ public void releasingMarksPortAsUnlocked() {
+ int port = PortManager.nextLockedFreePort();
+ assertTrue(PortManager.checkPortIfLocked(port));
+ assertTrue(PortManager.releaseLockedPort(port));
+ assertFalse(PortManager.checkPortIfLocked(port));
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 9ec10326c26..aafe4889ccc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -31,12 +31,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import lombok.CustomLog;
import lombok.Getter;
import org.apache.bookkeeper.bookie.BookieImpl;
-import org.apache.bookkeeper.bookie.Cookie;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.component.Lifecycle;
@@ -50,7 +47,6 @@ import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.commons.io.FileUtils;
-import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -74,7 +70,6 @@ public class BKCluster implements AutoCloseable {
protected final ServerConfiguration baseConf;
protected final ClientConfiguration baseClientConf;
- private final List<Integer> lockedPorts = new ArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
public static class BKClusterConf {
@@ -83,7 +78,6 @@ public class BKCluster implements AutoCloseable {
private String metadataServiceUri;
private int numBookies = 1;
private String dataDir;
- private int bkPort = 0;
private boolean clearOldData;
@@ -107,11 +101,6 @@ public class BKCluster implements AutoCloseable {
return this;
}
- public BKClusterConf bkPort(int bkPort) {
- this.bkPort = bkPort;
- return this;
- }
-
public BKClusterConf clearOldData(boolean clearOldData) {
this.clearOldData = clearOldData;
return this;
@@ -163,8 +152,6 @@ public class BKCluster implements AutoCloseable {
} catch (Exception e) {
log.error().exception(e).log("Got Exception while trying to
stop BKCluster");
}
- lockedPorts.forEach(PortManager::releaseLockedPort);
- lockedPorts.clear();
// cleanup temp dirs
try {
cleanupTempDirs();
@@ -240,40 +227,23 @@ public class BKCluster implements AutoCloseable {
cleanDirectory(dataDir);
}
- int port;
- if (baseConf.isEnableLocalTransport() ||
!baseConf.getAllowEphemeralPorts() || clusterConf.bkPort == 0) {
- port = PortManager.nextLockedFreePort();
- lockedPorts.add(port);
- } else {
- // bk 4.15 cookie validation finds the same ip:port in case of
port 0
- // and 2nd bookie's cookie validation fails
- port = clusterConf.bkPort;
- }
- File[] cookieDir = dataDir.listFiles((file) ->
file.getName().equals("current"));
- if (cookieDir != null && cookieDir.length > 0) {
- String existBookieAddr =
parseBookieAddressFromCookie(cookieDir[0]);
- if (existBookieAddr != null) {
- baseConf.setAdvertisedAddress(existBookieAddr.split(":")[0]);
- port = Integer.parseInt(existBookieAddr.split(":")[1]);
- }
- }
- return newServerConfiguration(port, dataDir, new File[]{dataDir});
- }
-
- private String parseBookieAddressFromCookie(File dir) throws IOException {
- Cookie cookie = Cookie.readFromDirectory(dir);
- Pattern pattern = Pattern.compile(".*bookieHost: \"(.*?)\".*",
Pattern.DOTALL);
- Matcher m = pattern.matcher(cookie.toString());
- return m.find() ? m.group(1) : null;
+ // Bookies bind to a kernel-assigned port. Identity is established via
a `bookieId`
+ // derived from the data dir path: bookies with the same dir (e.g. on
cluster restart)
+ // get the same id so cookies match, while bookies with different dirs
(e.g. separate
+ // test runs sharing the same metadata store) get different ids and
don't collide.
+ String bookieId = "bk-" + index + "-" +
Integer.toHexString(dataDir.getAbsolutePath().hashCode());
+ return newServerConfiguration(0, bookieId, dataDir, new
File[]{dataDir});
}
private ClientConfiguration newClientConfiguration() {
return new ClientConfiguration(baseConf);
}
- private ServerConfiguration newServerConfiguration(int port, File
journalDir, File[] ledgerDirs) {
+ private ServerConfiguration newServerConfiguration(int port, String
bookieId, File journalDir,
+ File[] ledgerDirs) {
ServerConfiguration conf = new ServerConfiguration(baseConf);
conf.setBookiePort(port);
+ conf.setBookieId(bookieId);
conf.setJournalDirName(journalDir.getPath());
String[] ledgerDirNames = new String[ledgerDirs.length];
for (int i = 0; i < ledgerDirs.length; i++) {
@@ -373,7 +343,7 @@ public class BKCluster implements AutoCloseable {
confReturn.setAllowEphemeralPorts(true);
confReturn.setJournalWriteData(false);
confReturn.setProperty("journalPreAllocSizeMB", 1);
- confReturn.setBookiePort(clusterConf.bkPort);
+ confReturn.setBookiePort(0);
confReturn.setGcWaitTime(1000L);
confReturn.setDiskUsageThreshold(0.999F);
confReturn.setDiskUsageWarnThreshold(0.99F);
diff --git
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
index 6159741f535..612a05fba7f 100644
---
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
+++
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
@@ -25,7 +25,6 @@
package org.apache.bookkeeper.replication;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.MoreExecutors;
@@ -316,14 +315,10 @@ public abstract class BookKeeperClusterTestCase {
protected ServerConfiguration newServerConfiguration() throws Exception {
File f = tmpDirs.createNew("bookie", "test");
-
- int port;
- if (baseConf.isEnableLocalTransport() ||
!baseConf.getAllowEphemeralPorts()) {
- port = nextLockedFreePort();
- } else {
- port = 0;
- }
- return newServerConfiguration(port, f, new File[] { f });
+ // Bookies need a pre-allocated port: BK identifies them by host:port
in metadata
+ // and the test client resolves that back to a TCP address. Port 0
would leave
+ // the cookie + registration with port=0, which fails DNS-style
resolution.
+ return newServerConfiguration(PortManager.nextLockedFreePort(), f, new
File[] { f });
}
protected ClientConfiguration newClientConfiguration() {
diff --git
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 3abd3580e86..33080e42832 100644
---
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -28,7 +28,6 @@ import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;
import com.google.common.base.Stopwatch;
import java.io.File;
@@ -291,14 +290,10 @@ public abstract class BookKeeperClusterTestCase {
protected ServerConfiguration newServerConfiguration() throws Exception {
File f = tmpDirs.createNew("bookie", "test");
-
- int port;
- if (baseConf.isEnableLocalTransport() ||
!baseConf.getAllowEphemeralPorts()) {
- port = nextLockedFreePort();
- } else {
- port = 0;
- }
- return newServerConfiguration(port, f, new File[] { f });
+ // Bookies need a pre-allocated port: BK identifies them by host:port
in metadata
+ // and the test client resolves that back to a TCP address. Port 0
would leave
+ // the cookie + registration with port=0, which fails DNS-style
resolution.
+ return newServerConfiguration(PortManager.nextLockedFreePort(), f, new
File[] { f });
}
protected ClientConfiguration newClientConfiguration() {
diff --git
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
deleted file mode 100644
index c5ab8dd629c..00000000000
---
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package
org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.Inet4Address;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import lombok.Cleanup;
-
-/**
- * Port manager allows a base port to be specified on the commandline. Tests
will then use ports, counting up from this
- * base port. This allows multiple instances of the bookkeeper tests to run at
once.
- */
-public class PortManager {
-
- private static final String lockFilename =
System.getProperty("test.lockFilename",
- "/tmp/pulsar-test-port-manager.lock");
- private static final int BASE_PORT =
Integer.parseInt(System.getProperty("test.basePort", "15000"));
-
- private static final int MAX_PORT = 32000;
-
- /**
- * Return a TCP port that is currently unused.
- *
- * Keeps track of assigned ports and avoid race condition between
different processes
- */
- public static synchronized int nextFreePort() {
- Path path = Paths.get(lockFilename);
-
- try {
- @Cleanup
- FileChannel fileChannel = FileChannel.open(path,
- StandardOpenOption.CREATE,
- StandardOpenOption.WRITE,
- StandardOpenOption.READ);
-
- @Cleanup
- FileLock lock = fileChannel.lock();
-
- ByteBuffer buffer = ByteBuffer.allocate(32);
- int len = fileChannel.read(buffer, 0L);
- buffer.flip();
-
- int lastUsedPort = BASE_PORT;
- if (len > 0) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- String lastUsedPortStr = new String(bytes);
- lastUsedPort = Integer.parseInt(lastUsedPortStr);
- }
-
- int freePort = probeFreePort(lastUsedPort + 1);
-
- buffer.clear();
- buffer.put(Integer.toString(freePort).getBytes());
- buffer.flip();
- fileChannel.write(buffer, 0L);
- fileChannel.truncate(buffer.position());
- fileChannel.force(true);
-
- return freePort;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static final int MAX_PORT_CONFLICTS = 10;
-
- private static synchronized int probeFreePort(int port) {
- int exceptionCount = 0;
- while (true) {
- if (port == MAX_PORT) {
- // Rollover the port probe
- port = BASE_PORT;
- }
-
- try (Socket s = new Socket()) {
- s.connect(new
InetSocketAddress(Inet4Address.getLoopbackAddress(), port), 100);
-
- // If we succeed to connect it means the port is being used
-
- } catch (ConnectException e) {
- return port;
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- port++;
- exceptionCount++;
- if (exceptionCount > MAX_PORT_CONFLICTS) {
- throw new RuntimeException("Failed to find an open port");
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
- while (true) {
- System.out.println("Port: " + nextFreePort());
- Thread.sleep(100);
- }
- }
-}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index d6eaf5dcc66..c834e09bd89 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.proxy.extensions;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
+import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
@@ -50,7 +51,6 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
@@ -90,6 +90,8 @@ public abstract class SimpleProxyExtensionTestBase extends
MockedPulsarServiceBa
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>>
newChannelInitializers() {
+ // Pre-allocate a free port: extension handlers need to register a
listener at a known
+ // address before the proxy calls back into them.
int port = nextLockedFreePort();
this.ports.add(port);
return Collections.singletonMap(new
InetSocketAddress(conf.getBindAddress(), port),
@@ -117,7 +119,7 @@ public abstract class SimpleProxyExtensionTestBase extends
MockedPulsarServiceBa
@Override
public void close() {
- ports.removeIf(PortManager::releaseLockedPort);
+ ports.removeIf(p -> releaseLockedPort(p));
}
}