This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aaccb00cbe54081abcbf40acc2b60cb82e7dbb1e
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Fri Jun 7 18:36:52 2024 +0300

    [fix][cli] Fix Pulsar standalone shutdown - bkCluster wasn't closed (#22868)
    
    (cherry picked from commit c5cc25ebdc3a32d002b944e77fb59c9ccd1f14c1)
---
 .../java/org/apache/pulsar/PulsarStandalone.java   | 10 ++++
 .../org/apache/pulsar/PulsarStandaloneStarter.java | 58 ++++++++++++++++------
 .../org/apache/pulsar/PulsarStandaloneTest.java    | 48 ++++++++++++++++--
 .../configurations/pulsar_broker_test.conf         | 26 +++++-----
 .../pulsar_broker_test_standalone.conf             | 26 +++++-----
 ...pulsar_broker_test_standalone_with_rocksdb.conf | 26 +++++-----
 .../configurations/standalone_no_client_auth.conf  |  4 +-
 .../pulsar/metadata/bookkeeper/BKCluster.java      | 43 ++++++++++------
 8 files changed, 167 insertions(+), 74 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index b785448cdac..7f80aa29f53 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -420,18 +420,22 @@ public class PulsarStandalone implements AutoCloseable {
         try {
             if (fnWorkerService != null) {
                 fnWorkerService.stop();
+                fnWorkerService = null;
             }
 
             if (broker != null) {
                 broker.close();
+                broker = null;
             }
 
             if (bkCluster != null) {
                 bkCluster.close();
+                bkCluster = null;
             }
 
             if (bkEnsemble != null) {
                 bkEnsemble.stop();
+                bkEnsemble = null;
             }
         } catch (Exception e) {
             log.error("Shutdown failed: {}", e.getMessage(), e);
@@ -496,5 +500,11 @@ public class PulsarStandalone implements AutoCloseable {
         ShutdownUtil.triggerImmediateForcefulShutdown(exitCode);
     }
 
+    public String getBrokerServiceUrl() {
+        return broker.getBrokerServiceUrl();
+    }
 
+    public String getWebServiceUrl() {
+        return broker.getWebServiceAddress();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 0ab731591da..29feac8cb46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -19,9 +19,12 @@
 package org.apache.pulsar;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import java.io.FileInputStream;
 import java.util.Arrays;
+import lombok.AccessLevel;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
@@ -38,6 +41,9 @@ public class PulsarStandaloneStarter extends PulsarStandalone 
{
 
     @Option(names = {"-g", "--generate-docs"}, description = "Generate docs")
     private boolean generateDocs = false;
+    private Thread shutdownThread;
+    @Setter(AccessLevel.PACKAGE)
+    private boolean testMode;
 
     public PulsarStandaloneStarter(String[] args) throws Exception {
 
@@ -108,30 +114,54 @@ public class PulsarStandaloneStarter extends 
PulsarStandalone {
                 }
             }
         }
+    }
 
+    @Override
+    public synchronized void start() throws Exception {
         registerShutdownHook();
+        super.start();
     }
 
     protected void registerShutdownHook() {
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+        if (shutdownThread != null) {
+            throw new IllegalStateException("Shutdown hook already 
registered");
+        }
+        shutdownThread = new Thread(() -> {
             try {
-                if (fnWorkerService != null) {
-                    fnWorkerService.stop();
-                }
-
-                if (broker != null) {
-                    broker.close();
-                }
-
-                if (bkEnsemble != null) {
-                    bkEnsemble.stop();
-                }
+                doClose(false);
             } catch (Exception e) {
                 log.error("Shutdown failed: {}", e.getMessage(), e);
             } finally {
-                LogManager.shutdown();
+                if (!testMode) {
+                    LogManager.shutdown();
+                }
             }
-        }));
+        });
+        Runtime.getRuntime().addShutdownHook(shutdownThread);
+    }
+
+    // simulate running the shutdown hook, for testing
+    @VisibleForTesting
+    void runShutdownHook() {
+        if (!testMode) {
+            throw new IllegalStateException("Not in test mode");
+        }
+        Runtime.getRuntime().removeShutdownHook(shutdownThread);
+        shutdownThread.run();
+        shutdownThread = null;
+    }
+
+    @Override
+    public void close() {
+        doClose(true);
+    }
+
+    private synchronized void doClose(boolean removeShutdownHook) {
+        super.close();
+        if (shutdownThread != null && removeShutdownHook) {
+            Runtime.getRuntime().removeShutdownHook(shutdownThread);
+            shutdownThread = null;
+        }
     }
 
     protected void exit(int status) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
index 6ed93a75a3f..3d22feb822e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.util.IOUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.bookkeeper.BKCluster;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -46,12 +47,15 @@ public class PulsarStandaloneTest {
     @Test
     public void testStandaloneWithRocksDB() throws Exception {
         String[] args = new String[]{"--config",
-                
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"};
+                
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf",
+                "-nss",
+                "-nfw"};
         final int bookieNum = 3;
         final File tempDir = IOUtils.createTempDir("standalone", "test");
 
         PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
         standalone.setBkDir(tempDir.getAbsolutePath());
+        standalone.setBkPort(0);
         standalone.setNumOfBk(bookieNum);
 
         standalone.startBookieWithMetadataStore();
@@ -90,11 +94,12 @@ public class PulsarStandaloneTest {
         }
         final File bkDir = IOUtils.createTempDir("standalone", "bk");
         standalone.setNumOfBk(1);
+        standalone.setBkPort(0);
         standalone.setBkDir(bkDir.getAbsolutePath());
         standalone.start();
 
         @Cleanup PulsarAdmin admin = PulsarAdmin.builder()
-                .serviceHttpUrl("http://localhost:8080";)
+                .serviceHttpUrl(standalone.getWebServiceUrl())
                 .authentication(new 
MockTokenAuthenticationProvider.MockAuthentication())
                 .build();
         if (enableBrokerClientAuth) {
@@ -104,8 +109,8 @@ public class PulsarStandaloneTest {
         } else {
             assertTrue(admin.clusters().getClusters().isEmpty());
             admin.clusters().createCluster("test_cluster", 
ClusterData.builder()
-                    .serviceUrl("http://localhost:8080/";)
-                    .brokerServiceUrl("pulsar://localhost:6650/")
+                    .serviceUrl(standalone.getWebServiceUrl())
+                    .brokerServiceUrl(standalone.getBrokerServiceUrl())
                     .build());
             assertTrue(admin.tenants().getTenants().isEmpty());
             admin.tenants().createTenant("public", TenantInfo.builder()
@@ -125,4 +130,39 @@ public class PulsarStandaloneTest {
         cleanDirectory(bkDir);
         cleanDirectory(metadataDir);
     }
+
+
+    @Test
+    public void testShutdownHookClosesBkCluster() throws Exception {
+        File dataDir = IOUtils.createTempDir("data", "");
+        File metadataDir = new File(dataDir, "metadata");
+        File bkDir = new File(dataDir, "bookkeeper");
+        @Cleanup
+        PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new 
String[] {
+                "--config",
+                
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf",
+                "-nss",
+                "-nfw",
+                "--metadata-dir",
+                metadataDir.getAbsolutePath(),
+                "--bookkeeper-dir",
+                bkDir.getAbsolutePath()
+        });
+        standalone.setTestMode(true);
+        standalone.setBkPort(0);
+        standalone.start();
+        BKCluster bkCluster = standalone.bkCluster;
+        standalone.runShutdownHook();
+        assertTrue(bkCluster.isClosed());
+    }
+
+    @Test
+    public void testWipeData() throws Exception {
+        PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(new 
String[] {
+                "--config",
+                
"./src/test/resources/configurations/standalone_no_client_auth.conf",
+                "--wipe-data"
+        });
+        assertTrue(standalone.isWipeData());
+    }
 }
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index f2316111f80..ddda30d0a4b 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -17,17 +17,17 @@
 # under the License.
 #
 
-applicationName="pulsar_broker"
-zookeeperServers="localhost"
-configurationStoreServers="localhost"
+applicationName=pulsar_broker
+zookeeperServers=localhost
+configurationStoreServers=localhost
 brokerServicePort=6650
-brokerServicePortTls=6651
+brokerServicePortTls=
 webServicePort=8080
-webServicePortTls=4443
+webServicePortTls=
 httpMaxRequestHeaderSize=1234
 bindAddress=0.0.0.0
 advertisedAddress=
-clusterName="test_cluster"
+clusterName=test_cluster
 brokerShutdownTimeoutMs=3000
 backlogQuotaCheckEnabled=true
 backlogQuotaCheckIntervalInSeconds=60
@@ -42,17 +42,17 @@ clientLibraryVersionCheckEnabled=false
 clientLibraryVersionCheckAllowUnversioned=true
 statusFilePath=/tmp/status.html
 tlsEnabled=false
-tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
-tlsKeyFilePath=/home/local/conf/pulsar/server.key
+tlsCertificateFilePath=
+tlsKeyFilePath=
 tlsTrustCertsFilePath=
 tlsAllowInsecureConnection=false
 authenticationEnabled=false
 authorizationEnabled=false
-superUserRoles="test_user"
-brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
+superUserRoles=test_user
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
 brokerClientAuthenticationParameters=
-bookkeeperClientAuthenticationPlugin="test_auth_plugin"
-bookkeeperClientAuthenticationAppId="test_auth_id"
+bookkeeperClientAuthenticationPlugin=
+bookkeeperClientAuthenticationAppId=test_auth_id
 bookkeeperClientTimeoutInSeconds=30
 bookkeeperClientSpeculativeReadTimeoutInMillis=0
 bookkeeperClientHealthCheckEnabled=true
@@ -64,7 +64,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
 bookkeeperClientMinNumRacksPerWriteQuorum=2
 bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
 bookkeeperClientReorderReadSequenceEnabled=false
-bookkeeperClientIsolationGroups="test_group"
+bookkeeperClientIsolationGroups=test_group
 managedLedgerDefaultEnsembleSize=3
 managedLedgerDefaultWriteQuorum=2
 managedLedgerDefaultAckQuorum=2
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index 4a40d9f0c65..812c8dc9748 100644
--- 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -17,18 +17,18 @@
 # under the License.
 #
 
-applicationName="pulsar_broker"
-metadataStoreUrl="zk:localhost:2181/ledger"
-configurationMetadataStoreUrl="zk:localhost:2181"
-brokerServicePort=6650
-brokerServicePortTls=6651
-webServicePort=8080
-webServicePortTls=4443
+applicationName=pulsar_broker
+metadataStoreUrl=zk:localhost:2181/ledger
+configurationMetadataStoreUrl=zk:localhost:2181
+brokerServicePort=0
+brokerServicePortTls=
+webServicePort=0
+webServicePortTls=
 bindAddress=0.0.0.0
 advertisedAddress=
 
advertisedListeners=internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651
 internalListenerName=internal
-clusterName="test_cluster"
+clusterName=test_cluster
 brokerShutdownTimeoutMs=3000
 backlogQuotaCheckEnabled=true
 backlogQuotaCheckIntervalInSeconds=60
@@ -49,11 +49,11 @@ tlsTrustCertsFilePath=
 tlsAllowInsecureConnection=false
 authenticationEnabled=false
 authorizationEnabled=false
-superUserRoles="test_user"
-brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
+superUserRoles=test_user
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
 brokerClientAuthenticationParameters=
-bookkeeperClientAuthenticationPlugin="test_auth_plugin"
-bookkeeperClientAuthenticationAppId="test_auth_id"
+bookkeeperClientAuthenticationPlugin=
+bookkeeperClientAuthenticationAppId=
 bookkeeperClientTimeoutInSeconds=30
 bookkeeperClientSpeculativeReadTimeoutInMillis=0
 bookkeeperClientHealthCheckEnabled=true
@@ -65,7 +65,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
 bookkeeperClientMinNumRacksPerWriteQuorum=2
 bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
 bookkeeperClientReorderReadSequenceEnabled=false
-bookkeeperClientIsolationGroups="test_group"
+bookkeeperClientIsolationGroups=
 managedLedgerDefaultEnsembleSize=3
 managedLedgerDefaultWriteQuorum=2
 managedLedgerDefaultAckQuorum=2
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
index d8b26bbbfa9..46c876686b0 100644
--- 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
@@ -17,19 +17,19 @@
 # under the License.
 #
 
-applicationName="pulsar_broker"
+applicationName=pulsar_broker
 metadataStoreUrl=
 configurationMetadataStoreUrl=
-brokerServicePort=6650
-brokerServicePortTls=6651
-webServicePort=8080
+brokerServicePort=0
+brokerServicePortTls=
+webServicePort=0
 allowLoopback=true
-webServicePortTls=4443
+webServicePortTls=
 bindAddress=0.0.0.0
 advertisedAddress=
 advertisedListeners=
 internalListenerName=internal
-clusterName="test_cluster"
+clusterName=test_cluster
 brokerShutdownTimeoutMs=3000
 backlogQuotaCheckEnabled=true
 backlogQuotaCheckIntervalInSeconds=60
@@ -44,17 +44,17 @@ clientLibraryVersionCheckEnabled=false
 clientLibraryVersionCheckAllowUnversioned=true
 statusFilePath=/tmp/status.html
 tlsEnabled=false
-tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
-tlsKeyFilePath=/home/local/conf/pulsar/server.key
+tlsCertificateFilePath=
+tlsKeyFilePath=
 tlsTrustCertsFilePath=
 tlsAllowInsecureConnection=false
 authenticationEnabled=false
 authorizationEnabled=false
-superUserRoles="test_user"
-brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
+superUserRoles=test_user
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationDisabled
 brokerClientAuthenticationParameters=
-bookkeeperClientAuthenticationPlugin="test_auth_plugin"
-bookkeeperClientAuthenticationAppId="test_auth_id"
+bookkeeperClientAuthenticationPlugin=
+bookkeeperClientAuthenticationAppId=test_auth_id
 bookkeeperClientTimeoutInSeconds=30
 bookkeeperClientSpeculativeReadTimeoutInMillis=0
 bookkeeperClientHealthCheckEnabled=true
@@ -66,7 +66,7 @@ bookkeeperClientRegionawarePolicyEnabled=false
 bookkeeperClientMinNumRacksPerWriteQuorum=2
 bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
 bookkeeperClientReorderReadSequenceEnabled=false
-bookkeeperClientIsolationGroups="test_group"
+bookkeeperClientIsolationGroups=test_group
 managedLedgerDefaultEnsembleSize=3
 managedLedgerDefaultWriteQuorum=2
 managedLedgerDefaultAckQuorum=2
diff --git 
a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
 
b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
index 4e2fd402983..6f0d82cef17 100644
--- 
a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf
@@ -17,8 +17,8 @@
 # under the License.
 #
 
-brokerServicePort=6650
-webServicePort=8080
+brokerServicePort=0
+webServicePort=0
 allowLoopback=true
 clusterName=test_cluster
 superUserRoles=admin
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index c2f3f72ec21..8d3a90239ef 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import lombok.Getter;
@@ -49,8 +50,8 @@ import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.PortManager;
 import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
@@ -74,6 +75,9 @@ public class BKCluster implements AutoCloseable {
     protected final ServerConfiguration baseConf;
     protected final ClientConfiguration baseClientConf;
 
+    private final List<Integer> lockedPorts = new ArrayList<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
     public static class BKClusterConf {
 
         private ServerConfiguration baseServerConfiguration;
@@ -148,20 +152,24 @@ public class BKCluster implements AutoCloseable {
 
     @Override
     public void close() throws Exception {
-        // stop bookkeeper service
-        try {
-            stopBKCluster();
-        } catch (Exception e) {
-            log.error("Got Exception while trying to stop BKCluster", e);
-        }
-        // cleanup temp dirs
-        try {
-            cleanupTempDirs();
-        } catch (Exception e) {
-            log.error("Got Exception while trying to cleanupTempDirs", e);
-        }
+        if (closed.compareAndSet(false, true)) {
+            // stop bookkeeper service
+            try {
+                stopBKCluster();
+            } catch (Exception e) {
+                log.error("Got Exception while trying to stop BKCluster", e);
+            }
+            lockedPorts.forEach(PortManager::releaseLockedPort);
+            lockedPorts.clear();
+            // cleanup temp dirs
+            try {
+                cleanupTempDirs();
+            } catch (Exception e) {
+                log.error("Got Exception while trying to cleanupTempDirs", e);
+            }
 
-        this.store.close();
+            this.store.close();
+        }
     }
 
     private File createTempDir(String prefix, String suffix) throws 
IOException {
@@ -229,7 +237,8 @@ public class BKCluster implements AutoCloseable {
 
         int port;
         if (baseConf.isEnableLocalTransport() || 
!baseConf.getAllowEphemeralPorts() || clusterConf.bkPort == 0) {
-            port = PortManager.nextFreePort();
+            port = PortManager.nextLockedFreePort();
+            lockedPorts.add(port);
         } else {
             // bk 4.15 cookie validation finds the same ip:port in case of 
port 0
             // and 2nd bookie's cookie validation fails
@@ -399,4 +408,8 @@ public class BKCluster implements AutoCloseable {
         serverConf.setAllowLoopback(true);
         return serverConf;
     }
+
+    public boolean isClosed() {
+        return closed.get();
+    }
 }

Reply via email to