This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cd2f38309e [cleanup][test] Remove PortManager and use kernel-assigned 
ports everywhere (#25694)
8cd2f38309e is described below

commit 8cd2f38309ea46f932a40868f730360976b0b96c
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 17:44:10 2026 -0700

    [cleanup][test] Remove PortManager and use kernel-assigned ports everywhere 
(#25694)
---
 .../mledger/impl/ManagedLedgerBkTest.java          |   4 +-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  13 +--
 .../java/org/apache/pulsar/PulsarStandalone.java   |  16 +--
 .../org/apache/pulsar/PulsarStandaloneBuilder.java |   5 -
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |  48 ++------
 .../org/apache/pulsar/PulsarStandaloneTest.java    |   7 +-
 .../apache/pulsar/broker/MultiBrokerBaseTest.java  |  40 +------
 .../apache/pulsar/broker/SLAMonitoringTest.java    |   2 +-
 .../broker/admin/GetPartitionMetadataTest.java     |   2 +-
 .../AbstractBrokerEntryCacheMultiBrokerTest.java   |   5 -
 .../loadbalance/AdvertisedListenersTest.java       |   2 +
 .../loadbalance/LeaderElectionServiceTest.java     |   2 +-
 .../broker/loadbalance/LoadBalancerTest.java       |   2 +-
 .../broker/loadbalance/SimpleBrokerStartTest.java  |   4 +-
 .../loadbalance/SimpleLoadManagerImplTest.java     |   2 +-
 .../extensions/BrokerRegistryIntegrationTest.java  |   2 +-
 .../loadbalance/extensions/BrokerRegistryTest.java |   2 +-
 .../extensions/ExtensibleLoadManagerCloseTest.java |   2 +-
 ...LoadManagerImplWithAdvertisedListenersTest.java |   8 +-
 .../extensions/LoadManagerFailFastTest.java        |   6 +-
 .../loadbalance/impl/BundleSplitterTaskTest.java   |   2 +-
 .../impl/ModularLoadManagerImplTest.java           |  43 ++++---
 .../protocol/PulsarClientBasedHandlerTest.java     |   6 +-
 .../protocol/SimpleProtocolHandlerTestsBase.java   |   6 +-
 .../broker/service/AdvertisedAddressTest.java      |   2 +-
 .../broker/service/BacklogQuotaManagerTest.java    |   2 +-
 .../pulsar/broker/service/BkEnsemblesTestBase.java |   2 +-
 .../broker/service/BrokerBookieIsolationTest.java  |   2 +-
 .../service/BrokerEventLoopShutdownTest.java       |   2 +-
 .../CanReconnectZKClientPulsarServiceBaseTest.java |   2 +-
 ...eoReplicationWithConfigurationSyncTestBase.java |   4 +-
 .../pulsar/broker/service/MaxMessageSizeTest.java  |   2 +-
 .../broker/service/NetworkErrorTestBase.java       |   2 +-
 .../broker/service/OneWayReplicatorTestBase.java   |   4 +-
 .../pulsar/broker/service/ReplicatorTestBase.java  |   8 +-
 .../pulsar/broker/service/TopicOwnerTest.java      |   2 +-
 .../service/persistent/ShadowTopicRealBkTest.java  |   7 +-
 .../broker/testcontext/PulsarTestContext.java      |   5 +
 .../coordinator/TransactionMetaStoreTestBase.java  |   2 +-
 .../client/api/ClientDeduplicationFailureTest.java |   2 +-
 .../client/api/HybridTypesAcknowledgeTest.java     |   2 +-
 .../pulsar/client/api/NonPersistentTopicTest.java  |   6 +-
 .../client/impl/ServiceUrlQuarantineTest.java      |  18 +--
 .../worker/PulsarFunctionE2ESecurityTest.java      |   2 +-
 .../worker/PulsarFunctionLocalRunTest.java         |   2 +-
 .../worker/PulsarFunctionPublishTest.java          |   2 +-
 .../functions/worker/PulsarFunctionTlsTest.java    |  20 ++--
 .../worker/PulsarWorkerAssignmentTest.java         |   2 +-
 .../apache/pulsar/io/AbstractPulsarE2ETest.java    |   2 +-
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |   2 +-
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |   2 +-
 .../zookeeper/LocalBookkeeperEnsembleTest.java     |   5 +-
 .../org/apache/pulsar/common/util/PortManager.java |  18 +--
 .../apache/pulsar/common/util/PortManagerTest.java |  38 +++++--
 .../pulsar/metadata/bookkeeper/BKCluster.java      |  50 ++------
 .../replication/BookKeeperClusterTestCase.java     |  13 +--
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  13 +--
 .../bookkeeper/bookkeeper/test/PortManager.java    | 126 ---------------------
 .../extensions/SimpleProxyExtensionTestBase.java   |   6 +-
 59 files changed, 194 insertions(+), 416 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 236f596c83c..23ad9af3ad0 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
@@ -134,7 +133,7 @@ public class ManagedLedgerBkTest extends 
BookKeeperClusterTestCase {
         metadataStore.unsetAlwaysFail();
 
         bkc = new PulsarBookKeeperTestClient(baseClientConf);
-        int port = startNewBookie();
+        startNewBookie();
 
         // Reconnect a new bk client
         factory.shutdown();
@@ -164,7 +163,6 @@ public class ManagedLedgerBkTest extends 
BookKeeperClusterTestCase {
         assertEquals("entry-2", new String(entries.get(0).getData()));
         entries.forEach(Entry::release);
         factory.shutdown();
-        releaseLockedPort(port);
     }
 
     @Test
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 78891cdd7a4..7e5486eac29 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -25,7 +25,6 @@
 package org.apache.bookkeeper.test;
 
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertFalse;
 import com.google.common.base.Stopwatch;
 import java.io.File;
@@ -296,14 +295,10 @@ public abstract class BookKeeperClusterTestCase {
 
     protected ServerConfiguration newServerConfiguration() throws Exception {
         File f = tmpDirs.createNew("bookie", "test");
-
-        int port;
-        if (baseConf.isEnableLocalTransport() || 
!baseConf.getAllowEphemeralPorts()) {
-            port = nextLockedFreePort();
-        } else {
-            port = 0;
-        }
-        return newServerConfiguration(port, f, new File[] { f });
+        // Bookies need a pre-allocated port: BK identifies them by host:port 
in metadata
+        // and the test client resolves that back to a TCP address. Port 0 
would leave
+        // the cookie + registration with port=0, which fails DNS-style 
resolution.
+        return newServerConfiguration(PortManager.nextLockedFreePort(), f, new 
File[] { f });
     }
 
     protected ClientConfiguration newClientConfiguration() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 0f4166be922..b6731263079 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -84,10 +84,6 @@ public class PulsarStandalone implements AutoCloseable {
         this.bkEnsemble = bkEnsemble;
     }
 
-    public void setBkPort(int bkPort) {
-        this.bkPort = bkPort;
-    }
-
     public void setBkDir(String bkDir) {
         this.bkDir = bkDir;
     }
@@ -172,10 +168,6 @@ public class PulsarStandalone implements AutoCloseable {
         return zkPort;
     }
 
-    public int getBkPort() {
-        return bkPort;
-    }
-
     public String getZkDir() {
         return zkDir;
     }
@@ -237,9 +229,6 @@ public class PulsarStandalone implements AutoCloseable {
             hidden = true)
     private int zkPort = 2181;
 
-    @Option(names = { "--bookkeeper-port" }, description = "Local bookies base 
port")
-    private int bkPort = 3181;
-
     @Option(names = { "--zookeeper-dir" },
             description = "Local zooKeeper's data directory",
             hidden = true)
@@ -470,7 +459,6 @@ public class PulsarStandalone implements AutoCloseable {
         bkCluster = BKCluster.builder()
                 .baseServerConfiguration(bkServerConf)
                 .metadataServiceUri(metadataStoreUrl)
-                .bkPort(bkPort)
                 .numBookies(numOfBk)
                 .dataDir(bkDir)
                 .clearOldData(wipeData)
@@ -484,9 +472,9 @@ public class PulsarStandalone implements AutoCloseable {
         ServerConfiguration bkServerConf = new ServerConfiguration();
         bkServerConf.loadConf(new File(configFile).toURI().toURL());
         calculateCacheSize(bkServerConf);
-        // Start LocalBookKeeper
+        // Start LocalBookKeeper. Bookies bind to kernel-assigned ports.
         bkEnsemble = new LocalBookkeeperEnsemble(
-                this.getNumOfBk(), this.getZkPort(), this.getBkPort(), 
this.getStreamStoragePort(), this.getZkDir(),
+                this.getNumOfBk(), this.getZkPort(), 
this.getStreamStoragePort(), this.getZkDir(),
                 this.getBkDir(), this.isWipeData(), "127.0.0.1");
         bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
         config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
index 5469d247829..2cf91c564d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
@@ -56,11 +56,6 @@ public final class PulsarStandaloneBuilder {
         return this;
     }
 
-    public PulsarStandaloneBuilder withBkPort(int bkPort) {
-        pulsarStandalone.setBkPort(bkPort);
-        return this;
-    }
-
     public PulsarStandaloneBuilder withZkDir(String zkDir) {
         pulsarStandalone.setZkDir(zkDir);
         return this;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index d8c67568304..2f68928d6d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import lombok.CustomLog;
@@ -91,46 +90,18 @@ public class LocalBookkeeperEnsemble {
     int numberOfBookies;
     private final boolean clearOldData;
 
-    private static class BasePortManager implements Supplier<Integer> {
-
-        private int port;
-
-        public BasePortManager(int basePort) {
-            this.port = basePort;
-        }
-
-        @Override
-        public synchronized Integer get() {
-            return port++;
-        }
+    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort) {
+        this(numberOfBookies, zkPort, 4181, null, null, true, null);
     }
 
-    private final Supplier<Integer> portManager;
-
-    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, 
Supplier<Integer> portManager) {
-        this(numberOfBookies, zkPort, 4181, null, null, true, null, 
portManager);
-    }
-
-    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int 
bkBasePort, String zkDataDirName,
+    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String 
zkDataDirName,
             String bkDataDirName, boolean clearOldData) {
-        this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, 
bkDataDirName, clearOldData, null);
+        this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, 
clearOldData, null);
     }
 
-    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int 
bkBasePort, String zkDataDirName,
+    public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String 
zkDataDirName,
             String bkDataDirName, boolean clearOldData, String 
advertisedAddress) {
-        this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, 
bkDataDirName, clearOldData, advertisedAddress);
-    }
-
-    public LocalBookkeeperEnsemble(int numberOfBookies,
-                                   int zkPort,
-                                   int bkBasePort,
-                                   int streamStoragePort,
-                                   String zkDataDirName,
-                                   String bkDataDirName,
-                                   boolean clearOldData,
-                                   String advertisedAddress) {
-        this(numberOfBookies, zkPort, streamStoragePort, zkDataDirName, 
bkDataDirName, clearOldData, advertisedAddress,
-                bkBasePort != 0 ? new BasePortManager(bkBasePort) : () -> 0);
+        this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, 
clearOldData, advertisedAddress);
     }
 
     public LocalBookkeeperEnsemble(int numberOfBookies,
@@ -139,10 +110,8 @@ public class LocalBookkeeperEnsemble {
             String zkDataDirName,
             String bkDataDirName,
             boolean clearOldData,
-            String advertisedAddress,
-            Supplier<Integer> portManager) {
+            String advertisedAddress) {
         this.numberOfBookies = numberOfBookies;
-        this.portManager = portManager;
         this.streamStoragePort = streamStoragePort;
         this.zkDataDirName = zkDataDirName;
         this.bkDataDirName = bkDataDirName;
@@ -301,7 +270,8 @@ public class LocalBookkeeperEnsemble {
                 cleanDirectory(bkDataDir);
             }
 
-            int bookiePort = portManager.get();
+            // Bookies bind to a kernel-assigned port; identity is established 
via bookieId.
+            int bookiePort = 0;
             String bookieId = "bk" + i + "test";
             // Ensure registration Z-nodes are cleared when standalone service 
is restarted ungracefully
             deleteBookieRegistrationZnode(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
index 77c1aced14d..21b4fe43de2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
@@ -54,7 +54,6 @@ public class PulsarStandaloneTest {
 
         PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
         standalone.setBkDir(tempDir.getAbsolutePath());
-        standalone.setBkPort(0);
         standalone.setNumOfBk(bookieNum);
 
         standalone.startBookieWithMetadataStore();
@@ -67,10 +66,12 @@ public class PulsarStandaloneTest {
         List<ServerConfiguration> secondBsConfs = 
standalone.bkCluster.getBsConfs();
         Assert.assertEquals(secondBsConfs.size(), bookieNum);
 
+        // Cookies must be preserved across restart (otherwise bookie startup 
would have failed
+        // with InvalidCookieException). The bookieId is the persistent 
identity.
         for (int i = 0; i < bookieNum; i++) {
             ServerConfiguration conf1 = firstBsConfs.get(i);
             ServerConfiguration conf2 = secondBsConfs.get(i);
-            Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort());
+            Assert.assertEquals(conf1.getBookieId(), conf2.getBookieId());
         }
         standalone.close();
         cleanDirectory(tempDir);
@@ -93,7 +94,6 @@ public class PulsarStandaloneTest {
         }
         final File bkDir = IOUtils.createTempDir("standalone", "bk");
         standalone.setNumOfBk(1);
-        standalone.setBkPort(0);
         standalone.setBkDir(bkDir.getAbsolutePath());
         standalone.start();
 
@@ -148,7 +148,6 @@ public class PulsarStandaloneTest {
                 bkDir.getAbsolutePath()
         });
         standalone.setTestMode(true);
-        standalone.setBkPort(0);
         standalone.start();
         BKCluster bkCluster = standalone.bkCluster;
         standalone.runShutdownHook();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index 076d649d5d9..203b8668288 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -22,7 +22,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import lombok.CustomLog;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -35,7 +34,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
-import org.apache.pulsar.common.util.PortManager;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
@@ -46,6 +44,7 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
     protected List<PulsarAdmin> additionalBrokerAdmins;
     protected List<PulsarClient> additionalBrokerClients;
     protected PulsarMockBookKeeper mockBookKeeper;
+    // Populated after broker startup with kernel-assigned ports.
     protected int mainBrokerPort;
     protected List<Integer> additionalBrokerPorts = new ArrayList<>();
 
@@ -53,17 +52,10 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
         return 2;
     }
 
-    protected boolean useDynamicBrokerPorts() {
-        return true;
-    }
-
     @BeforeClass(alwaysRun = true)
     @Override
     public final void setup() throws Exception {
         beforeSetup();
-        if (!useDynamicBrokerPorts()) {
-            mainBrokerPort = PortManager.nextLockedFreePort();
-        }
         OrderedExecutor mockBookKeeperExecutor = 
OrderedExecutor.newBuilder().numThreads(1)
                 .name(MultiBrokerBaseTest.class.getSimpleName() + 
"-bk-executor").build();
         registerCloseable(() -> GracefulExecutorServicesShutdown.initiate()
@@ -75,6 +67,7 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
             ((NonClosableMockBookKeeper) mockBookKeeper).reallyShutdown();
         });
         super.internalSetup();
+        mainBrokerPort = pulsar.getBrokerListenPort().orElse(0);
         additionalBrokersSetup();
         pulsarResourcesSetup();
         additionalSetup();
@@ -89,14 +82,6 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
         pulsarTestContextBuilder.bookKeeperClient(mockBookKeeper);
     }
 
-    @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
-        if (!useDynamicBrokerPorts()) {
-            conf.setBrokerServicePort(Optional.of(mainBrokerPort));
-        }
-    }
-
     protected void additionalSetup() throws Exception {
         // override this method to add any additional setup logic
 
@@ -115,17 +100,12 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
         additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers);
         additionalPulsarTestContexts = new 
ArrayList<>(numberOfAdditionalBrokers);
         additionalBrokerPorts = new ArrayList<>(numberOfAdditionalBrokers);
-        if (!useDynamicBrokerPorts()) {
-            for (int i = 0; i < numberOfAdditionalBrokers; i++) {
-                int port = PortManager.nextLockedFreePort();
-                additionalBrokerPorts.add(port);
-            }
-        }
         for (int i = 0; i < numberOfAdditionalBrokers; i++) {
             PulsarTestContext pulsarTestContext = createAdditionalBroker(i);
             additionalPulsarTestContexts.add(i, pulsarTestContext);
             PulsarService pulsarService = pulsarTestContext.getPulsarService();
             additionalBrokers.add(i, pulsarService);
+            
additionalBrokerPorts.add(pulsarService.getBrokerListenPort().orElse(0));
             PulsarAdminBuilder pulsarAdminBuilder =
                     
PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() != 
null
                             ? pulsarService.getWebServiceAddress()
@@ -159,9 +139,6 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
 
     protected PulsarTestContext createAdditionalBroker(int 
additionalBrokerIndex) throws Exception {
         ServiceConfiguration conf = 
createConfForAdditionalBroker(additionalBrokerIndex);
-        if (!useDynamicBrokerPorts()) {
-            
conf.setBrokerServicePort(Optional.of(additionalBrokerPorts.get(additionalBrokerIndex)));
-        }
         return createAdditionalPulsarTestContext(conf);
     }
 
@@ -175,14 +152,6 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
             log.warn().exception(e).log("Exception during additional cleanup");
         }
         super.internalCleanup();
-        if (!useDynamicBrokerPorts()) {
-            if (mainBrokerPort > 0) {
-                PortManager.releaseLockedPort(mainBrokerPort);
-            }
-            for (Integer port : additionalBrokerPorts) {
-                PortManager.releaseLockedPort(port);
-            }
-        }
     }
 
     protected void additionalCleanup() throws Exception {
@@ -212,9 +181,6 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
                 try {
                     
pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
                     pulsarTestContext.close();
-                    
pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
-                    
pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort);
-                    
pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
                 } catch (Exception e) {
                     log.warn().exception(e).log("Failed to stop additional 
broker");
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 523006e9d74..3426753fbc6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -71,7 +71,7 @@ public class SLAMonitoringTest {
                         new LinkedBlockingQueue<>());
         log.info("---- Initializing SLAMonitoringTest -----");
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         // start brokers
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index 49e4c6b2103..b4dec430547 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -78,7 +78,7 @@ public class GetPartitionMetadataTest extends 
TestRetrySupport {
     @BeforeClass(alwaysRun = true)
     protected void setup() throws Exception {
         incrementSetupNumber();
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
         // Start broker.
         setupBrokers();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
index 48cc5c67993..3e2b0a4655c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
@@ -197,11 +197,6 @@ public abstract class 
AbstractBrokerEntryCacheMultiBrokerTest extends MultiBroke
         return 1;
     }
 
-    @Override
-    protected boolean useDynamicBrokerPorts() {
-        return false;
-    }
-
     @BeforeMethod(alwaysRun = true)
     public final void doBeforeMethod() {
         beforeMethod();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
index d019af35a69..64c0ee3f47a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -65,6 +65,8 @@ public class AdvertisedListenersTest extends 
MultiBrokerBaseTest {
     }
 
     private void updateConfig(ServiceConfiguration conf, String 
advertisedAddress) {
+        // Pre-allocate ports because the advertised listener URLs are baked 
into config
+        // before the broker starts. The broker then binds to the same ports.
         int pulsarPort = nextLockedFreePort();
         int httpPort = nextLockedFreePort();
         int httpsPort = nextLockedFreePort();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 62571cc3ff4..0bdeb4341d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -52,7 +52,7 @@ public class LeaderElectionServiceTest {
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
         log.info("---- bk started ----");
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index c6617bbacaa..cf8db26a4a6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -107,7 +107,7 @@ public class LoadBalancerTest {
     @BeforeMethod
     void setup() throws Exception {
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
         ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(),
                 
SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index 879afd00c1b..36bd4b1e0fc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -48,7 +48,7 @@ public class SimpleBrokerStartTest {
         }
         // Start local bookkeeper ensemble
         @Cleanup("stop")
-        LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, 
() -> 0);
+        LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
         // Start broker
         ServiceConfiguration config = new ServiceConfiguration();
@@ -79,7 +79,7 @@ public class SimpleBrokerStartTest {
         }
         // Start local bookkeeper ensemble
         @Cleanup("stop")
-        LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, 
() -> 0);
+        LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
         // Start broker
         ServiceConfiguration config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 6103b948236..20a5883d65d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -124,7 +124,7 @@ public class SimpleLoadManagerImplTest {
     void setup() throws Exception {
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         // Start broker 1
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
index 80e54278d31..138ad1dee11 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
@@ -47,7 +47,7 @@ public class BrokerRegistryIntegrationTest {
 
     @BeforeClass
     protected void setup() throws Exception {
-        bk = new LocalBookkeeperEnsemble(2, 0, () -> 0);
+        bk = new LocalBookkeeperEnsemble(2, 0);
         bk.start();
         pulsar = new PulsarService(brokerConfig());
         pulsar.start();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 0247a68256e..73b50b4603a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -169,7 +169,7 @@ public class BrokerRegistryTest {
         executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
                 new LinkedBlockingQueue<>());
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
index 50265c28edc..f0734fbe8bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -47,7 +47,7 @@ public class ExtensibleLoadManagerCloseTest {
 
     @BeforeClass(alwaysRun = true)
     public void setup() throws Exception {
-        bk = new LocalBookkeeperEnsemble(1, 0, () -> 0);
+        bk = new LocalBookkeeperEnsemble(1, 0);
         bk.start();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
index f393e585896..656088c2ba5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions;
 
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import java.util.Optional;
 import lombok.CustomLog;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.util.PortManager;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
@@ -46,8 +46,10 @@ public class 
ExtensibleLoadManagerImplWithAdvertisedListenersTest extends Extens
     @Override
     protected ServiceConfiguration updateConfig(ServiceConfiguration conf) {
         super.updateConfig(conf);
-        int privatePulsarPort = nextLockedFreePort();
-        int publicPulsarPort = nextLockedFreePort();
+        // Pre-allocate ports because advertised listener URLs are baked into 
config
+        // before the broker starts.
+        int privatePulsarPort = PortManager.nextLockedFreePort();
+        int publicPulsarPort = PortManager.nextLockedFreePort();
         conf.setInternalListenerName("internal");
         conf.setBindAddresses("external:pulsar://localhost:" + 
publicPulsarPort);
         conf.setAdvertisedListeners(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
index a400bf733e5..1dc254492b7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java
@@ -26,7 +26,6 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
-import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
@@ -38,8 +37,7 @@ import org.testng.annotations.Test;
 public class LoadManagerFailFastTest {
 
     private static final String cluster = "test";
-    private final int zkPort = PortManager.nextLockedFreePort();
-    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
zkPort, PortManager::nextLockedFreePort);
+    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
0);
     private final ServiceConfiguration config = new ServiceConfiguration();
 
     @BeforeClass
@@ -49,7 +47,7 @@ public class LoadManagerFailFastTest {
         config.setAdvertisedAddress("localhost");
         config.setBrokerServicePort(Optional.of(0));
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:localhost:" + zkPort);
+        config.setMetadataStoreUrl("zk:localhost:" + bk.getZookeeperPort());
     }
 
     @AfterClass
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 7a012b78756..085bea1170e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -59,7 +59,7 @@ public class BundleSplitterTaskTest {
     @BeforeMethod
     void setup() throws Exception {
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
         // Start broker
         ServiceConfiguration config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index c3b5a02e5ac..ecabd901116 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -173,7 +173,7 @@ public class ModularLoadManagerImplTest {
         executor = new ThreadPoolExecutor(1, 20, 30, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         // Start broker 1
@@ -947,25 +947,32 @@ public class ModularLoadManagerImplTest {
         ServiceConfiguration config = new ServiceConfiguration();
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName("use");
-        
config.setWebServicePort(Optional.of(PortManager.nextLockedFreePort()));
-        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
-        config.setBrokerShutdownTimeoutMs(0L);
-        config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
-        config.setBrokerServicePort(Optional.of(0));
-        PulsarService pulsar = new PulsarService(config);
-        // create znode using different zk-session
-        final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" 
+ pulsar.getAdvertisedAddress() + ":"
-                + config.getWebServicePort().get();
-        pulsar1.getLocalMetadataStore()
-                .put(brokerZnode, new byte[0], Optional.empty(), 
EnumSet.of(CreateOption.Ephemeral)).join();
+        // Pre-allocate a port: the test creates a znode at the broker's 
would-be address before
+        // starting the broker, so it needs to know the address up front.
+        int webPort = PortManager.nextLockedFreePort();
         try {
-            pulsar.start();
-            fail("should have failed");
-        } catch (PulsarServerException e) {
-            //Ok.
-        }
+            config.setWebServicePort(Optional.of(webPort));
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
+            config.setBrokerShutdownTimeoutMs(0L);
+            
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+            config.setBrokerServicePort(Optional.of(0));
+            PulsarService pulsar = new PulsarService(config);
+            // create znode using different zk-session
+            final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + 
"/" + pulsar.getAdvertisedAddress() + ":"
+                    + config.getWebServicePort().get();
+            pulsar1.getLocalMetadataStore()
+                    .put(brokerZnode, new byte[0], Optional.empty(), 
EnumSet.of(CreateOption.Ephemeral)).join();
+            try {
+                pulsar.start();
+                fail("should have failed");
+            } catch (PulsarServerException e) {
+                //Ok.
+            }
 
-        pulsar.close();
+            pulsar.close();
+        } finally {
+            PortManager.releaseLockedPort(webPort);
+        }
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
index 411c57c3f12..0c6797c93fa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.protocol;
 import java.io.File;
 import java.util.Optional;
 import lombok.CustomLog;
-import org.apache.bookkeeper.util.PortManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -38,8 +37,7 @@ public class PulsarClientBasedHandlerTest {
 
     private static final String clusterName = "cluster";
     private static final int shutdownTimeoutMs = 100;
-    private final int zkPort = PortManager.nextFreePort();
-    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
zkPort, PortManager::nextFreePort);
+    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
0);
     private File tempDirectory;
     private PulsarService pulsar;
 
@@ -51,7 +49,7 @@ public class PulsarClientBasedHandlerTest {
         config.setAdvertisedAddress("localhost");
         config.setBrokerServicePort(Optional.of(0));
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort());
 
         tempDirectory = 
SimpleProtocolHandlerTestsBase.configureProtocolHandler(config,
                 PulsarClientBasedHandler.class.getName(), true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
index 9e32f4be8be..f1cb937868a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.protocol;
 
 import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
+import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
 import static org.testng.Assert.assertEquals;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
@@ -46,7 +47,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.common.util.PortManager;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -88,6 +88,8 @@ public abstract class SimpleProtocolHandlerTestsBase extends 
BrokerTestBase {
 
         @Override
         public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> 
newChannelInitializers() {
+            // Pre-allocate a free port: protocol handlers need to register a 
listener at a known
+            // address before the broker calls back into them.
             int port = nextLockedFreePort();
             this.ports.add(port);
             return Collections.singletonMap(new 
InetSocketAddress(conf.getBindAddress(), port),
@@ -115,7 +117,7 @@ public abstract class SimpleProtocolHandlerTestsBase 
extends BrokerTestBase {
 
         @Override
         public void close() {
-            ports.removeIf(PortManager::releaseLockedPort);
+            ports.removeIf(p -> releaseLockedPort(p));
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index 8dafe7a6af3..d76c7fb19d2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -41,7 +41,7 @@ public class AdvertisedAddressTest {
 
     @BeforeMethod
     public void setup() throws Exception {
-        bkEnsemble = new LocalBookkeeperEnsemble(1, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(1, 0);
         bkEnsemble.start();
 
         ServiceConfiguration config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 9a85b65b1e7..cbc066bd99a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -138,7 +138,7 @@ public class BacklogQuotaManagerTest {
     void setup() throws Exception {
         try {
             // start local bookie and zookeeper
-            bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+            bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
             bkEnsemble.start();
 
             // start pulsar service
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index 4a1fcbb04db..2b01bbbb9d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -86,7 +86,7 @@ public abstract class BkEnsemblesTestBase extends 
TestRetrySupport {
         incrementSetupNumber();
         try {
             // start local bookie and zookeeper
-            bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 
0);
+            bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0);
             bkEnsemble.start();
 
             // start pulsar service
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 2403675253f..24be9695184 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -97,7 +97,7 @@ public class BrokerBookieIsolationTest {
     @BeforeMethod
     protected void setup() throws Exception {
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(4, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(4, 0);
         bkEnsemble.start();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
index 29421f155b6..b2667102198 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEventLoopShutdownTest.java
@@ -37,7 +37,7 @@ public class BrokerEventLoopShutdownTest {
 
     @BeforeClass(alwaysRun = true)
     public void setup() throws Exception {
-        bk = new LocalBookkeeperEnsemble(2, 0, () -> 0);
+        bk = new LocalBookkeeperEnsemble(2, 0);
         bk.start();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
index c0bd01c64a2..f9f0030c45f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -80,7 +80,7 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
         brokerConfigZk.start();
 
         // Start BK.
-        bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0);
         bkEnsemble.start();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
index 3b4bac53fec..728af789992 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java
@@ -77,9 +77,9 @@ public abstract class 
GeoReplicationWithConfigurationSyncTestBase extends TestRe
         brokerConfigZk2.start();
 
         // Start BK.
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble1.start();
-        bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble2.start();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index 61e45462290..b4a7e4ef0ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -54,7 +54,7 @@ public class MaxMessageSizeTest {
     @BeforeMethod
     void setup() {
         try {
-            bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+            bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
             ServerConfiguration conf = new ServerConfiguration();
             conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024 + 10 * 1024);
             bkEnsemble.startStandalone(conf, false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
index 0d5ae2a4c0d..de30a4c9d82 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
@@ -86,7 +86,7 @@ public abstract class NetworkErrorTestBase extends 
TestRetrySupport {
 
     protected void startZKAndBK() throws Exception {
         // Start ZK & BK.
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble1.start();
 
         metadataZKProxy = new Ipv4Proxy(getOneFreePort(), "127.0.0.1", 
bkEnsemble1.getZookeeperPort());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index a73a2844f79..0aec5358be6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -114,9 +114,9 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         }
 
         // Start BK.
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble1.start();
-        bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble2.start();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 265d685223e..755230a8537 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -155,7 +155,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         globalZkS.start();
 
         // Start region 1
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble1.start();
 
         // NOTE: we have to instantiate a new copy of System.getProperties() 
to make sure pulsar1 and pulsar2 have
@@ -174,7 +174,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         // Start region 2
 
         // Start zk & bks
-        bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble2.start();
 
         setConfig2DefaultValue();
@@ -190,7 +190,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         // Start region 3
 
         // Start zk & bks
-        bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble3.start();
 
         setConfig3DefaultValue();
@@ -206,7 +206,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         // Start region 4
 
         // Start zk & bks
-        bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble4.start();
 
         setConfig4DefaultValue();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 809f9a5a6e0..cccc3ceeb52 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -71,7 +71,7 @@ public class TopicOwnerTest {
     void setup() throws Exception {
         log.info("---- Initializing TopicOwnerTest -----");
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         // start brokers
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
index e58b8cdcce7..9b52582f6f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
@@ -41,10 +41,9 @@ import org.testng.annotations.Test;
 public class ShadowTopicRealBkTest {
 
     private static final String cluster = "test";
-    // Pass 0 for both ZK and bookie ports so the kernel picks free ports at 
bind time, avoiding
-    // any JVM-vs-OS race on pre-allocated ports. The actual ZK port is read 
back via
-    // bk.getZookeeperPort().
-    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
0, () -> 0);
+    // ZK port 0 lets the kernel pick a free port at bind time. The actual 
port is read back
+    // via bk.getZookeeperPort() after start. Bookies always bind to 
kernel-assigned ports.
+    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
0);
     private PulsarService pulsar;
     private PulsarAdmin admin;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index ff344ebadea..c7136fcb84a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -723,6 +723,11 @@ public class PulsarTestContext implements AutoCloseable {
 
         protected void handlePreallocatePorts(ServiceConfiguration config) {
             if (super.preallocatePorts) {
+                // Pre-allocate ports for callers that need the port number 
BEFORE the broker
+                // starts (e.g. to build advertised-listener URLs). 
PortManager hands out ports
+                // outside the ephemeral range, so the kernel won't 
auto-assign them to other
+                // processes. Tests that don't need a pre-known port should 
leave
+                // preallocatePorts=false and let the broker bind to 0 
directly.
                 config.getBrokerServicePort().ifPresent(portNumber -> {
                     if (portNumber == 0) {
                         
config.setBrokerServicePort(Optional.of(PortManager.nextLockedFreePort()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 81321427a2b..904a4807ad3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -50,7 +50,7 @@ public abstract class TransactionMetaStoreTestBase extends 
TestRetrySupport {
     protected final void setup() throws Exception {
         log.info().attr("class", getClass().getSimpleName()).log("---- 
Initializing -----");
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         String[] args = new String[]{
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index 2602233293e..6f173aa2ba0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -76,7 +76,7 @@ public class ClientDeduplicationFailureTest {
         log.info().attr("upMethod", method.getName()).log("--- Setting up 
method");
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java
index 3fa6750c7de..451a15ebe50 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/HybridTypesAcknowledgeTest.java
@@ -73,7 +73,7 @@ public class HybridTypesAcknowledgeTest  extends 
TestRetrySupport {
     @BeforeClass(alwaysRun = true)
     protected void setup() throws Exception {
         incrementSetupNumber();
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
         // Start broker.
         setupBrokers();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 97126457cd1..1552940cda4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -989,7 +989,7 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
             globalZkS.start();
 
             // Start region 1
-            bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+            bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0);
             bkEnsemble1.start();
 
             // NOTE: we have to instantiate a new copy of 
System.getProperties() to make sure pulsar1 and pulsar2 have
@@ -1019,7 +1019,7 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
             // Start region 2
 
             // Start zk & bks
-            bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+            bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0);
             bkEnsemble2.start();
 
             config2 = new ServiceConfiguration();
@@ -1046,7 +1046,7 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
             // Start region 3
 
             // Start zk & bks
-            bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+            bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0);
             bkEnsemble3.start();
 
             config3 = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
index a1d31c0b9b5..db465e643ac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ServiceUrlQuarantineTest.java
@@ -54,18 +54,21 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
     private PulsarClientImpl pulsarClientWithHttpServiceUrlDisableQuarantine;
     private int brokerServicePort;
     private int webServicePort;
-    private final Set<Integer> lockedFreePortSet = new HashSet<>();
+    private final Set<Integer> reservedPorts = new HashSet<>();
     private static final int UNAVAILABLE_NODES = 20;
     private static final int TIMEOUT_MS = 500;
 
     @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        // Pre-allocate ports for the broker because they must match the URL 
the test builds below.
         this.brokerServicePort = nextLockedFreePort();
         this.webServicePort = nextLockedFreePort();
         super.internalSetup();
         super.producerBaseSetup();
-        // Create a Pulsar client with some unavailable nodes
+        // Build a service URL that includes a bunch of unavailable-node 
ports. PortManager
+        // hands out ports outside the ephemeral range, so nothing else is 
going to grab them
+        // and the connection attempts to those addresses will fail as 
expected.
         StringBuilder binaryServiceUrlBuilder = new 
StringBuilder(pulsar.getBrokerServiceUrl());
         StringBuilder httpServiceUrlBuilder = new 
StringBuilder(pulsar.getWebServiceAddress());
         for (int i = 0; i < UNAVAILABLE_NODES; i++) {
@@ -98,9 +101,9 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
     }
 
     private int nextLockedFreePort() {
-        int newLockedFreePort = PortManager.nextLockedFreePort();
-        this.lockedFreePortSet.add(newLockedFreePort);
-        return newLockedFreePort;
+        int port = PortManager.nextLockedFreePort();
+        reservedPorts.add(port);
+        return port;
     }
 
     @Override
@@ -133,9 +136,8 @@ public class ServiceUrlQuarantineTest extends 
ProducerConsumerBase {
         if (pulsarClientWithHttpServiceUrlDisableQuarantine != null) {
             pulsarClientWithHttpServiceUrlDisableQuarantine.close();
         }
-        for (Integer port : lockedFreePortSet) {
-            PortManager.releaseLockedPort(port);
-        }
+        reservedPorts.forEach(PortManager::releaseLockedPort);
+        reservedPorts.clear();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 3fbd4564a92..a39b1ef1b87 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -119,7 +119,7 @@ public class PulsarFunctionE2ESecurityTest {
         log.info().attr("method", method.getName()).log("Setting up method");
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 122a81d3779..b6952682fa1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -202,7 +202,7 @@ public class PulsarFunctionLocalRunTest {
         log.info().attr("method", method.getName()).log("Setting up method");
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 1bbabca3613..5450c0c17b3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -124,7 +124,7 @@ public class PulsarFunctionPublishTest {
         log.info().attr("method", method.getName()).log("Setting up method");
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 9f27350f73d..9727dfb8f41 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -97,21 +96,22 @@ public class PulsarFunctionTlsTest {
     void setup() throws Exception {
         log.info("---- Initializing TopicOwnerTest -----");
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         // start brokers
         for (int i = 0; i < BROKER_COUNT; i++) {
-            int brokerPort = nextLockedFreePort();
-            int webPort = nextLockedFreePort();
-
             ServiceConfiguration config = new ServiceConfiguration();
             config.setBrokerShutdownTimeoutMs(0L);
             
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
             config.setWebServicePort(Optional.empty());
-            config.setWebServicePortTls(Optional.of(webPort));
+            // Pre-allocate the TLS web port: 
PulsarService.initializeWorkerConfigFromBrokerConfig
+            // builds workerId = "c-{cluster}-fw-{host}-{port}" from the 
CONFIGURED port. Two
+            // brokers configured with port 0 would end up with the same 
workerId and the
+            // function-worker membership manager would never elect a leader.
+            
config.setWebServicePortTls(Optional.of(PortManager.nextLockedFreePort()));
             config.setBrokerServicePort(Optional.empty());
-            config.setBrokerServicePortTls(Optional.of(brokerPort));
+            config.setBrokerServicePortTls(Optional.of(0));
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
             config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
@@ -220,11 +220,9 @@ public class PulsarFunctionTlsTest {
             }
             for (int i = 0; i < BROKER_COUNT; i++) {
                 if (pulsarServices[i] != null) {
-                    pulsarServices[i].close();
-                    pulsarServices[i].getConfiguration().
-                            
getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
                     pulsarServices[i].getConfiguration()
-                            
.getWebServicePort().ifPresent(PortManager::releaseLockedPort);
+                            
.getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
+                    pulsarServices[i].close();
                     pulsarServices[i] = null;
                 }
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 7766b8a5054..ae91b671643 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -83,7 +83,7 @@ public class PulsarWorkerAssignmentTest {
         log.info().attr("method", method.getName()).log("Setting up method");
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index cc010b82d9b..53705e13be6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -114,7 +114,7 @@ public abstract class AbstractPulsarE2ETest {
         log.info().attr("method", method.getName()).log("--- Setting up method 
---");
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 9e4b12f0a46..2bda831a999 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -93,7 +93,7 @@ public class PulsarFunctionAdminTest {
         log.info().attr("method", method.getName()).log("Setting up method");
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index aa78d3aaa91..f121566d037 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -110,7 +110,7 @@ public class PulsarFunctionTlsTest {
         log.info().attr("method", method.getName()).log("Setting up method");
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
         bkEnsemble.start();
 
         config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
index 3bc42cf0368..65ad1e490c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java
@@ -42,7 +42,7 @@ public class LocalBookkeeperEnsembleTest {
         final int numBk = 1;
 
         // Start local Bookies/ZooKeepers and confirm that they are running at 
specified ports
-        LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, 
0, () -> 0);
+        LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble(numBk, 
0);
         ensemble.start();
         assertTrue(ensemble.getZkServer().isRunning());
         assertEquals(ensemble.getZkServer().getClientPort(), 
ensemble.getZookeeperPort());
@@ -60,8 +60,7 @@ public class LocalBookkeeperEnsembleTest {
     public void testStartWithSpecifiedStreamStoragePort() throws Exception {
         LocalBookkeeperEnsemble ensemble = null;
         try {
-            ensemble =
-                    new LocalBookkeeperEnsemble(1, 0, 0, 4182, null, null, 
true, null);
+            ensemble = new LocalBookkeeperEnsemble(1, 0, 4182, null, null, 
true, null);
             ensemble.startStandalone(new ServerConfiguration(), true);
         } finally {
             if (ensemble != null) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
index 612deb9e017..fab5104e8c3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java
@@ -22,14 +22,18 @@ import java.net.ServerSocket;
 import java.util.HashSet;
 import java.util.Set;
 
+/**
+ * Allocates ports for tests that need a known port number BEFORE binding 
(e.g. tests that build
+ * advertised-listener URLs, or that pre-create metadata znodes at the 
broker's would-be address).
+ * For everything else, prefer binding to port 0 and reading the 
kernel-assigned port back.
+ */
 public class PortManager {
 
     private static final Set<Integer> PORTS = new HashSet<>();
 
     /**
-     * Return a locked available port.
-     *
-     * @return locked available port.
+     * Return a free port that is reserved for the caller until {@link 
#releaseLockedPort(int)}
+     * is invoked.
      */
     public static synchronized int nextLockedFreePort() {
         int exceptionCount = 0;
@@ -50,18 +54,16 @@ public class PortManager {
     }
 
     /**
-     * Returns whether the port was released successfully.
+     * Release a previously locked port.
      *
-     * @return whether the release is successful.
+     * @return true if the port was previously locked by this manager
      */
     public static synchronized boolean releaseLockedPort(int lockedPort) {
         return PORTS.remove(lockedPort);
     }
 
     /**
-     * Check port if locked.
-     *
-     * @return whether the port is locked.
+     * @return true if the port is currently locked by this manager
      */
     public static synchronized boolean checkPortIfLocked(int lockedPort) {
         return PORTS.contains(lockedPort);
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
index 408753300cc..b3f91617a5c 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java
@@ -18,19 +18,41 @@
  */
 package org.apache.pulsar.common.util;
 
-import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
-import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import org.testng.annotations.Test;
 
 public class PortManagerTest {
+
+    @Test
+    public void allocatesAFreePort() {
+        int port = PortManager.nextLockedFreePort();
+        try {
+            assertTrue(port > 0);
+            assertTrue(PortManager.checkPortIfLocked(port));
+        } finally {
+            PortManager.releaseLockedPort(port);
+        }
+    }
+
+    @Test
+    public void allocatesDistinctPorts() {
+        int p1 = PortManager.nextLockedFreePort();
+        int p2 = PortManager.nextLockedFreePort();
+        try {
+            assertNotEquals(p1, p2);
+        } finally {
+            PortManager.releaseLockedPort(p1);
+            PortManager.releaseLockedPort(p2);
+        }
+    }
+
     @Test
-    public void testCheckPortIfLockedAndRemove() {
-        int port = nextLockedFreePort();
-        assertTrue(checkPortIfLocked(port));
-        assertTrue(releaseLockedPort(port));
-        assertFalse(checkPortIfLocked(port));
+    public void releasingMarksPortAsUnlocked() {
+        int port = PortManager.nextLockedFreePort();
+        assertTrue(PortManager.checkPortIfLocked(port));
+        assertTrue(PortManager.releaseLockedPort(port));
+        assertFalse(PortManager.checkPortIfLocked(port));
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 9ec10326c26..aafe4889ccc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -31,12 +31,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import lombok.CustomLog;
 import lombok.Getter;
 import org.apache.bookkeeper.bookie.BookieImpl;
-import org.apache.bookkeeper.bookie.Cookie;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.common.component.Lifecycle;
@@ -50,7 +47,6 @@ import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
@@ -74,7 +70,6 @@ public class BKCluster implements AutoCloseable {
     protected final ServerConfiguration baseConf;
     protected final ClientConfiguration baseClientConf;
 
-    private final List<Integer> lockedPorts = new ArrayList<>();
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
     public static class BKClusterConf {
@@ -83,7 +78,6 @@ public class BKCluster implements AutoCloseable {
         private String metadataServiceUri;
         private int numBookies = 1;
         private String dataDir;
-        private int bkPort = 0;
 
         private boolean clearOldData;
 
@@ -107,11 +101,6 @@ public class BKCluster implements AutoCloseable {
             return this;
         }
 
-        public BKClusterConf bkPort(int bkPort) {
-            this.bkPort = bkPort;
-            return this;
-        }
-
         public BKClusterConf clearOldData(boolean clearOldData) {
             this.clearOldData = clearOldData;
             return this;
@@ -163,8 +152,6 @@ public class BKCluster implements AutoCloseable {
             } catch (Exception e) {
                 log.error().exception(e).log("Got Exception while trying to 
stop BKCluster");
             }
-            lockedPorts.forEach(PortManager::releaseLockedPort);
-            lockedPorts.clear();
             // cleanup temp dirs
             try {
                 cleanupTempDirs();
@@ -240,40 +227,23 @@ public class BKCluster implements AutoCloseable {
             cleanDirectory(dataDir);
         }
 
-        int port;
-        if (baseConf.isEnableLocalTransport() || 
!baseConf.getAllowEphemeralPorts() || clusterConf.bkPort == 0) {
-            port = PortManager.nextLockedFreePort();
-            lockedPorts.add(port);
-        } else {
-            // bk 4.15 cookie validation finds the same ip:port in case of 
port 0
-            // and 2nd bookie's cookie validation fails
-            port = clusterConf.bkPort;
-        }
-        File[] cookieDir = dataDir.listFiles((file) -> 
file.getName().equals("current"));
-        if (cookieDir != null && cookieDir.length > 0) {
-            String existBookieAddr = 
parseBookieAddressFromCookie(cookieDir[0]);
-            if (existBookieAddr != null) {
-                baseConf.setAdvertisedAddress(existBookieAddr.split(":")[0]);
-                port = Integer.parseInt(existBookieAddr.split(":")[1]);
-            }
-        }
-        return newServerConfiguration(port, dataDir, new File[]{dataDir});
-    }
-
-    private String parseBookieAddressFromCookie(File dir) throws IOException {
-        Cookie cookie = Cookie.readFromDirectory(dir);
-        Pattern pattern = Pattern.compile(".*bookieHost: \"(.*?)\".*", 
Pattern.DOTALL);
-        Matcher m = pattern.matcher(cookie.toString());
-        return m.find() ? m.group(1) : null;
+        // Bookies bind to a kernel-assigned port. Identity is established via 
a `bookieId`
+        // derived from the data dir path: bookies with the same dir (e.g. on 
cluster restart)
+        // get the same id so cookies match, while bookies with different dirs 
(e.g. separate
+        // test runs sharing the same metadata store) get different ids and 
don't collide.
+        String bookieId = "bk-" + index + "-" + 
Integer.toHexString(dataDir.getAbsolutePath().hashCode());
+        return newServerConfiguration(0, bookieId, dataDir, new 
File[]{dataDir});
     }
 
     private ClientConfiguration newClientConfiguration() {
         return new ClientConfiguration(baseConf);
     }
 
-    private ServerConfiguration newServerConfiguration(int port, File 
journalDir, File[] ledgerDirs) {
+    private ServerConfiguration newServerConfiguration(int port, String 
bookieId, File journalDir,
+                                                       File[] ledgerDirs) {
         ServerConfiguration conf = new ServerConfiguration(baseConf);
         conf.setBookiePort(port);
+        conf.setBookieId(bookieId);
         conf.setJournalDirName(journalDir.getPath());
         String[] ledgerDirNames = new String[ledgerDirs.length];
         for (int i = 0; i < ledgerDirs.length; i++) {
@@ -373,7 +343,7 @@ public class BKCluster implements AutoCloseable {
         confReturn.setAllowEphemeralPorts(true);
         confReturn.setJournalWriteData(false);
         confReturn.setProperty("journalPreAllocSizeMB", 1);
-        confReturn.setBookiePort(clusterConf.bkPort);
+        confReturn.setBookiePort(0);
         confReturn.setGcWaitTime(1000L);
         confReturn.setDiskUsageThreshold(0.999F);
         confReturn.setDiskUsageWarnThreshold(0.99F);
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
index 6159741f535..612a05fba7f 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
@@ -25,7 +25,6 @@
 package org.apache.bookkeeper.replication;
 
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertFalse;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -316,14 +315,10 @@ public abstract class BookKeeperClusterTestCase {
 
     protected ServerConfiguration newServerConfiguration() throws Exception {
         File f = tmpDirs.createNew("bookie", "test");
-
-        int port;
-        if (baseConf.isEnableLocalTransport() || 
!baseConf.getAllowEphemeralPorts()) {
-            port = nextLockedFreePort();
-        } else {
-            port = 0;
-        }
-        return newServerConfiguration(port, f, new File[] { f });
+        // Bookies need a pre-allocated port: BK identifies them by host:port 
in metadata
+        // and the test client resolves that back to a TCP address. Port 0 
would leave
+        // the cookie + registration with port=0, which fails DNS-style 
resolution.
+        return newServerConfiguration(PortManager.nextLockedFreePort(), f, new 
File[] { f });
     }
 
     protected ClientConfiguration newClientConfiguration() {
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 3abd3580e86..33080e42832 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -28,7 +28,6 @@ import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
-import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertFalse;
 import com.google.common.base.Stopwatch;
 import java.io.File;
@@ -291,14 +290,10 @@ public abstract class BookKeeperClusterTestCase {
 
     protected ServerConfiguration newServerConfiguration() throws Exception {
         File f = tmpDirs.createNew("bookie", "test");
-
-        int port;
-        if (baseConf.isEnableLocalTransport() || 
!baseConf.getAllowEphemeralPorts()) {
-            port = nextLockedFreePort();
-        } else {
-            port = 0;
-        }
-        return newServerConfiguration(port, f, new File[] { f });
+        // Bookies need a pre-allocated port: BK identifies them by host:port 
in metadata
+        // and the test client resolves that back to a TCP address. Port 0 
would leave
+        // the cookie + registration with port=0, which fails DNS-style 
resolution.
+        return newServerConfiguration(PortManager.nextLockedFreePort(), f, new 
File[] { f });
     }
 
     protected ClientConfiguration newClientConfiguration() {
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
deleted file mode 100644
index c5ab8dd629c..00000000000
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package 
org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.Inet4Address;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import lombok.Cleanup;
-
-/**
- * Port manager allows a base port to be specified on the commandline. Tests 
will then use ports, counting up from this
- * base port. This allows multiple instances of the bookkeeper tests to run at 
once.
- */
-public class PortManager {
-
-    private static final String lockFilename = 
System.getProperty("test.lockFilename",
-            "/tmp/pulsar-test-port-manager.lock");
-    private static final int BASE_PORT = 
Integer.parseInt(System.getProperty("test.basePort", "15000"));
-
-    private static final int MAX_PORT = 32000;
-
-    /**
-     * Return a TCP port that is currently unused.
-     *
-     * Keeps track of assigned ports and avoid race condition between 
different processes
-     */
-    public static synchronized int nextFreePort() {
-        Path path = Paths.get(lockFilename);
-
-        try {
-            @Cleanup
-            FileChannel fileChannel = FileChannel.open(path,
-                    StandardOpenOption.CREATE,
-                    StandardOpenOption.WRITE,
-                    StandardOpenOption.READ);
-
-            @Cleanup
-            FileLock lock = fileChannel.lock();
-
-            ByteBuffer buffer = ByteBuffer.allocate(32);
-            int len = fileChannel.read(buffer, 0L);
-            buffer.flip();
-
-            int lastUsedPort = BASE_PORT;
-            if (len > 0) {
-                byte[] bytes = new byte[buffer.remaining()];
-                buffer.get(bytes);
-                String lastUsedPortStr = new String(bytes);
-                lastUsedPort = Integer.parseInt(lastUsedPortStr);
-            }
-
-            int freePort = probeFreePort(lastUsedPort + 1);
-
-            buffer.clear();
-            buffer.put(Integer.toString(freePort).getBytes());
-            buffer.flip();
-            fileChannel.write(buffer, 0L);
-            fileChannel.truncate(buffer.position());
-            fileChannel.force(true);
-
-            return freePort;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static final int MAX_PORT_CONFLICTS = 10;
-
-    private static synchronized int probeFreePort(int port) {
-        int exceptionCount = 0;
-        while (true) {
-            if (port == MAX_PORT) {
-                // Rollover the port probe
-                port = BASE_PORT;
-            }
-
-            try (Socket s = new Socket()) {
-                s.connect(new 
InetSocketAddress(Inet4Address.getLoopbackAddress(), port), 100);
-
-                // If we succeed to connect it means the port is being used
-
-            } catch (ConnectException e) {
-                return port;
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-            port++;
-            exceptionCount++;
-            if (exceptionCount > MAX_PORT_CONFLICTS) {
-                throw new RuntimeException("Failed to find an open port");
-            }
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        while (true) {
-            System.out.println("Port: " + nextFreePort());
-            Thread.sleep(100);
-        }
-    }
-}
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index d6eaf5dcc66..c834e09bd89 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.extensions;
 
 import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
+import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 import io.netty.buffer.ByteBuf;
@@ -50,7 +51,6 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
@@ -90,6 +90,8 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
 
         @Override
         public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> 
newChannelInitializers() {
+            // Pre-allocate a free port: extension handlers need to register a 
listener at a known
+            // address before the proxy calls back into them.
             int port = nextLockedFreePort();
             this.ports.add(port);
             return Collections.singletonMap(new 
InetSocketAddress(conf.getBindAddress(), port),
@@ -117,7 +119,7 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
 
         @Override
         public void close() {
-            ports.removeIf(PortManager::releaseLockedPort);
+            ports.removeIf(p -> releaseLockedPort(p));
         }
     }
 


Reply via email to