This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eccc6b647e9 [fix][test] Fix more resource leaks in tests (#24314)
eccc6b647e9 is described below

commit eccc6b647e9cb07a18901471de1b2f8fafa88417
Author: Lari Hotari <[email protected]>
AuthorDate: Sat May 17 09:02:05 2025 +0300

    [fix][test] Fix more resource leaks in tests (#24314)
---
 .../pulsar/tests/ThreadLeakDetectorListener.java   |   8 +
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  16 +-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |   9 +-
 .../ProxySaslAuthenticationTest.java               | 501 +++++++++++----------
 .../BookieRackAffinityMappingTest.java             |   1 +
 .../apache/pulsar/common/util/RateLimiterTest.java |   4 +-
 .../impl/FileStoreBackedReadHandleImpl.java        |   2 +-
 .../offload/filesystem/FileStoreTestBase.java      |   6 +-
 .../impl/FileSystemManagedLedgerOffloaderTest.java |  11 +-
 .../impl/FileSystemOffloaderLocalFileTest.java     |  33 +-
 10 files changed, 326 insertions(+), 265 deletions(-)

diff --git 
a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
 
b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
index 3708a867246..43f219ebc23 100644
--- 
a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
+++ 
b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
@@ -227,6 +227,10 @@ public class ThreadLeakDetectorListener extends 
BetweenTestClassesListenerAdapte
             if (threadName.equals("process reaper")) {
                 return true;
             }
+            // skip thread created by sun.net.www.http.KeepAliveCache
+            if (threadName.equals("Keep-Alive-Timer")) {
+                return true;
+            }
             // skip JVM internal thread related to agent attach
             if (threadName.equals("Attach Listener")) {
                 return true;
@@ -255,6 +259,10 @@ public class ThreadLeakDetectorListener extends 
BetweenTestClassesListenerAdapte
             if (threadName.equals("Grizzly-HttpSession-Expirer")) {
                 return true;
             }
+            // skip Hadoop LocalFileSystem stats thread
+            if 
(threadName.equals("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner"))
 {
+                return true;
+            }
             // Testcontainers AbstractWaitStrategy.EXECUTOR
             if (threadName.startsWith("testcontainers-wait-")) {
                 return true;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 225f4dba493..627e3225519 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -250,7 +250,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         openTelemetryManagedCursorStats = new 
OpenTelemetryManagedCursorStats(openTelemetry, this);
     }
 
-    static class DefaultBkFactory implements 
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
+    static class DefaultBkFactory implements 
BookkeeperFactoryForCustomEnsemblePlacementPolicy, AutoCloseable {
 
         private final BookKeeper bkClient;
 
@@ -263,6 +263,11 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         public CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig 
policy) {
             return CompletableFuture.completedFuture(bkClient);
         }
+
+        @Override
+        public void close() throws Exception {
+            bkClient.close();
+        }
     }
 
     private synchronized void handleMetadataStoreNotification(SessionEvent e) {
@@ -646,13 +651,20 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                             }
                         }
                     }));
-                }).thenAcceptAsync(__ -> {
+                }).whenCompleteAsync((__, ___) -> {
                     //wait for tasks in scheduledExecutor executed.
                     openTelemetryManagedCursorStats.close();
                     openTelemetryManagedLedgerStats.close();
                     openTelemetryCacheStats.close();
                     scheduledExecutor.shutdownNow();
                     entryCacheManager.clear();
+                    if (bookkeeperFactory instanceof DefaultBkFactory 
defaultBkFactory) {
+                        try {
+                            defaultBkFactory.close();
+                        } catch (Exception e) {
+                            log.warn("Failed to close bookkeeper client", e);
+                        }
+                    }
                 });
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index a323ecfeb8e..d3a2e83ba89 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -26,7 +26,6 @@ package org.apache.bookkeeper.test;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertFalse;
-
 import com.google.common.base.Stopwatch;
 import java.io.File;
 import java.io.IOException;
@@ -73,9 +72,9 @@ import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeTest;
 
 /**
  * A class runs several bookie servers for testing.
@@ -148,7 +147,7 @@ public abstract class BookKeeperClusterTestCase {
         }
     }
 
-    @BeforeTest(alwaysRun = true)
+    @BeforeClass(alwaysRun = true)
     public void setUp() throws Exception {
         setUp(getLedgersRootPath());
     }
@@ -187,7 +186,7 @@ public abstract class BookKeeperClusterTestCase {
         return "";
     }
 
-    @AfterTest(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     public void tearDown() throws Exception {
         boolean failed = false;
         for (Throwable e : asyncExceptions) {
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index ca28befabc1..7a50fa42089 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -62,255 +62,256 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
-       private static final Logger log = 
LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);
-
-       public static File kdcDir;
-       public static File kerberosWorkDir;
-       public static File brokerSecretKeyFile;
-       public static File proxySecretKeyFile;
-
-       private static MiniKdc kdc;
-       private static Properties properties;
-
-       private static String localHostname = "localhost";
-
-       @BeforeClass
-       public static void startMiniKdc() throws Exception {
-               kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
-               kerberosWorkDir = 
Files.createTempDirectory("test-kerberos-work-dir").toFile();
-
-               properties = MiniKdc.createConf();
-               kdc = new MiniKdc(properties, kdcDir);
-               kdc.start();
-
-               String principalBrokerNoRealm = "broker/" + localHostname;
-               String principalBroker = "broker/" + localHostname + "@" + 
kdc.getRealm();
-               log.info("principalBroker: " + principalBroker);
-
-               String principalClientNoRealm = "client/" + localHostname;
-               String principalClient = principalClientNoRealm + "@" + 
kdc.getRealm();
-               log.info("principalClient: " + principalClient);
-
-               String principalProxyNoRealm = "proxy/" + localHostname;
-               String principalProxy = principalProxyNoRealm + "@" + 
kdc.getRealm();
-               log.info("principalProxy: " + principalProxy);
-
-               File keytabClient = new File(kerberosWorkDir, 
"pulsarclient.keytab");
-               kdc.createPrincipal(keytabClient, principalClientNoRealm);
-
-               File keytabBroker = new File(kerberosWorkDir, 
"pulsarbroker.keytab");
-               kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);
-
-               File keytabProxy = new File(kerberosWorkDir, 
"pulsarproxy.keytab");
-               kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
-
-               File jaasFile = new File(kerberosWorkDir, "jaas.conf");
-               try (FileWriter writer = new FileWriter(jaasFile)) {
-                       writer.write("\n"
-                               + "PulsarBroker {\n"
-                               + "  
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
-                               + "  useKeyTab=true\n"
-                               + "  keyTab=\"" + 
keytabBroker.getAbsolutePath() + "\n"
-                               + "  storeKey=true\n"
-                               + "  useTicketCache=false\n" // won't test 
useTicketCache=true on JUnit tests
-                               + "  principal=\"" + principalBroker + "\";\n"
-                               + "};\n"
-                               + "\n"
-                               + "\n"
-                               + "\n"
-                               + "PulsarProxy{\n"
-                               + "  
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
-                               + "  useKeyTab=true\n"
-                               + "  keyTab=\"" + keytabProxy.getAbsolutePath() 
+ "\n"
-                               + "  storeKey=true\n"
-                               + "  useTicketCache=false\n" // won't test 
useTicketCache=true on JUnit tests
-                               + "  principal=\"" + principalProxy + "\";\n"
-                               + "};\n"
-                               + "\n"
-                               + "\n"
-                               + "\n"
-                               + "PulsarClient {\n"
-                               + "  
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
-                               + "  useKeyTab=true\n"
-                               + "  keyTab=\"" + 
keytabClient.getAbsolutePath() + "\n"
-                               + "  storeKey=true\n"
-                               + "  useTicketCache=false\n"
-                               + "  principal=\"" + principalClient + "\";\n"
-                               + "};\n"
-                       );
-               }
-
-               File krb5file = new File(kerberosWorkDir, "krb5.conf");
-               try (FileWriter writer = new FileWriter(krb5file)) {
-                       String conf = "[libdefaults]\n"
-                               + " default_realm = " + kdc.getRealm() + "\n"
-                               + " udp_preference_limit = 1\n" // force use TCP
-                               + "\n"
-                               + "\n"
-                               + "[realms]\n"
-                               + " " + kdc.getRealm() + "  = {\n"
-                               + "  kdc = " + kdc.getHost() + ":" + 
kdc.getPort() + "\n"
-                               + " }";
-                       writer.write(conf);
-                       log.info("krb5.conf:\n" + conf);
-               }
-
-               System.setProperty("java.security.auth.login.config", 
jaasFile.getAbsolutePath());
-               System.setProperty("java.security.krb5.conf", 
krb5file.getAbsolutePath());
-               Configuration.getConfiguration().refresh();
-
-               // Client config
-
-               log.info("created AuthenticationSasl");
-       }
-
-       @AfterClass(alwaysRun = true)
-       public static void stopMiniKdc() {
-               System.clearProperty("java.security.auth.login.config");
-               System.clearProperty("java.security.krb5.conf");
-               if (kdc != null) {
-                       kdc.stop();
-               }
-               FileUtils.deleteQuietly(kdcDir);
-               FileUtils.deleteQuietly(kerberosWorkDir);
-               Assert.assertFalse(kdcDir.exists());
-               Assert.assertFalse(kerberosWorkDir.exists());
-       }
-
-       @BeforeMethod
-       @Override
-       protected void setup() throws Exception {
-               log.info("-- {} --, start at host: {}", methodName, 
localHostname);
-               isTcpLookup = true;
-               conf.setAdvertisedAddress(localHostname);
-               conf.setAuthenticationEnabled(true);
-               conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
-               conf.setSaslJaasServerSectionName("PulsarBroker");
-               brokerSecretKeyFile = 
File.createTempFile("saslRoleTokenSignerSecret", ".key");
-               Files.write(Paths.get(brokerSecretKeyFile.toString()), 
"PulsarSecret".getBytes());
-               
conf.setSaslJaasServerRoleTokenSignerSecretPath(brokerSecretKeyFile.toString());
-               Set<String> providers = new HashSet<>();
-               providers.add(AuthenticationProviderSasl.class.getName());
-               conf.setAuthenticationProviders(providers);
-               conf.setClusterName("test");
-               conf.setSuperUserRoles(ImmutableSet.of("client/" + 
localHostname + "@" + kdc.getRealm()));
-               // set admin auth, to verify admin web resources
-               Map<String, String> clientSaslConfig = new HashMap<>();
-               clientSaslConfig.put("saslJaasClientSectionName", 
"PulsarClient");
-               clientSaslConfig.put("serverType", "broker");
-               
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
-               conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
-                               
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
-
-               super.init();
-
-               lookupUrl = new URI(pulsar.getBrokerServiceUrl());
-               log.info("set client jaas section name: PulsarClient");
-               closeAdmin();
-               admin = PulsarAdmin.builder()
-                       .serviceHttpUrl(brokerUrl.toString())
-                       
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(),
 clientSaslConfig))
-                       .build();
-               super.producerBaseSetup();
-               log.info("-- {} --, end.", methodName);
-       }
-
-       @Override
-       @AfterMethod(alwaysRun = true)
-       protected void cleanup() throws Exception {
-               FileUtils.deleteQuietly(brokerSecretKeyFile);
-               Assert.assertFalse(brokerSecretKeyFile.exists());
-               FileUtils.deleteQuietly(proxySecretKeyFile);
-               Assert.assertFalse(proxySecretKeyFile.exists());
-               super.internalCleanup();
-       }
-
-       @Test
-       void testAuthentication() throws Exception {
-               log.info("-- Starting {} test --", methodName);
-
-               // Step 1: Create Admin Client
-
-               // create a client which connects to proxy and pass authData
-               String topicName = "persistent://my-property/my-ns/my-topic1";
-
-               ProxyConfiguration proxyConfig = new ProxyConfiguration();
-               proxyConfig.setAuthenticationEnabled(true);
-               proxyConfig.setServicePort(Optional.of(0));
-               proxyConfig.setBrokerProxyAllowedTargetPorts("*");
-               proxyConfig.setWebServicePort(Optional.of(0));
-               proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
-               proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + 
".*");
-               proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
-               proxyConfig.setClusterName(configClusterName);
-
-               // proxy connect to broker
-               
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
-               proxyConfig.setBrokerClientAuthenticationParameters(
-                       "{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," 
+
-                               "\"serverType\": " + "\"broker\"}");
-               proxySecretKeyFile = 
File.createTempFile("saslRoleTokenSignerSecret", ".key");
-               Files.write(Paths.get(proxySecretKeyFile.toString()), 
"PulsarSecret".getBytes());
-               
proxyConfig.setSaslJaasServerRoleTokenSignerSecretPath(proxySecretKeyFile.toString());
-               // proxy as a server, it will use sasl to authn
-               Set<String> providers = new HashSet<>();
-               providers.add(AuthenticationProviderSasl.class.getName());
-               proxyConfig.setAuthenticationProviders(providers);
-
-               proxyConfig.setForwardAuthorizationCredentials(true);
-               AuthenticationService authenticationService = new 
AuthenticationService(
+    private static final Logger log = 
LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);
+
+    public static File kdcDir;
+    public static File kerberosWorkDir;
+    public static File brokerSecretKeyFile;
+    public static File proxySecretKeyFile;
+
+    private static MiniKdc kdc;
+    private static Properties properties;
+
+    private static String localHostname = "localhost";
+
+    @BeforeClass
+    public static void startMiniKdc() throws Exception {
+        kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
+        kerberosWorkDir = 
Files.createTempDirectory("test-kerberos-work-dir").toFile();
+
+        properties = MiniKdc.createConf();
+        kdc = new MiniKdc(properties, kdcDir);
+        kdc.start();
+
+        String principalBrokerNoRealm = "broker/" + localHostname;
+        String principalBroker = "broker/" + localHostname + "@" + 
kdc.getRealm();
+        log.info("principalBroker: " + principalBroker);
+
+        String principalClientNoRealm = "client/" + localHostname;
+        String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
+        log.info("principalClient: " + principalClient);
+
+        String principalProxyNoRealm = "proxy/" + localHostname;
+        String principalProxy = principalProxyNoRealm + "@" + kdc.getRealm();
+        log.info("principalProxy: " + principalProxy);
+
+        File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
+        kdc.createPrincipal(keytabClient, principalClientNoRealm);
+
+        File keytabBroker = new File(kerberosWorkDir, "pulsarbroker.keytab");
+        kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);
+
+        File keytabProxy = new File(kerberosWorkDir, "pulsarproxy.keytab");
+        kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
+
+        File jaasFile = new File(kerberosWorkDir, "jaas.conf");
+        try (FileWriter writer = new FileWriter(jaasFile)) {
+            writer.write("\n"
+                + "PulsarBroker {\n"
+                + "  com.sun.security.auth.module.Krb5LoginModule required 
debug=true\n"
+                + "  useKeyTab=true\n"
+                + "  keyTab=\"" + keytabBroker.getAbsolutePath() + "\n"
+                + "  storeKey=true\n"
+                + "  useTicketCache=false\n" // won't test useTicketCache=true 
on JUnit tests
+                + "  principal=\"" + principalBroker + "\";\n"
+                + "};\n"
+                + "\n"
+                + "\n"
+                + "\n"
+                + "PulsarProxy{\n"
+                + "  com.sun.security.auth.module.Krb5LoginModule required 
debug=true\n"
+                + "  useKeyTab=true\n"
+                + "  keyTab=\"" + keytabProxy.getAbsolutePath() + "\n"
+                + "  storeKey=true\n"
+                + "  useTicketCache=false\n" // won't test useTicketCache=true 
on JUnit tests
+                + "  principal=\"" + principalProxy + "\";\n"
+                + "};\n"
+                + "\n"
+                + "\n"
+                + "\n"
+                + "PulsarClient {\n"
+                + "  com.sun.security.auth.module.Krb5LoginModule required 
debug=true\n"
+                + "  useKeyTab=true\n"
+                + "  keyTab=\"" + keytabClient.getAbsolutePath() + "\n"
+                + "  storeKey=true\n"
+                + "  useTicketCache=false\n"
+                + "  principal=\"" + principalClient + "\";\n"
+                + "};\n"
+            );
+        }
+
+        File krb5file = new File(kerberosWorkDir, "krb5.conf");
+        try (FileWriter writer = new FileWriter(krb5file)) {
+            String conf = "[libdefaults]\n"
+                + " default_realm = " + kdc.getRealm() + "\n"
+                + " udp_preference_limit = 1\n" // force use TCP
+                + "\n"
+                + "\n"
+                + "[realms]\n"
+                + " " + kdc.getRealm() + "  = {\n"
+                + "  kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+                + " }";
+            writer.write(conf);
+            log.info("krb5.conf:\n" + conf);
+        }
+
+        System.setProperty("java.security.auth.login.config", 
jaasFile.getAbsolutePath());
+        System.setProperty("java.security.krb5.conf", 
krb5file.getAbsolutePath());
+        Configuration.getConfiguration().refresh();
+
+        // Client config
+
+        log.info("created AuthenticationSasl");
+    }
+
+    @AfterClass(alwaysRun = true)
+    public static void stopMiniKdc() {
+        System.clearProperty("java.security.auth.login.config");
+        System.clearProperty("java.security.krb5.conf");
+        if (kdc != null) {
+            kdc.stop();
+        }
+        FileUtils.deleteQuietly(kdcDir);
+        FileUtils.deleteQuietly(kerberosWorkDir);
+        Assert.assertFalse(kdcDir.exists());
+        Assert.assertFalse(kerberosWorkDir.exists());
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        log.info("-- {} --, start at host: {}", methodName, localHostname);
+        isTcpLookup = true;
+        conf.setAdvertisedAddress(localHostname);
+        conf.setAuthenticationEnabled(true);
+        conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+        conf.setSaslJaasServerSectionName("PulsarBroker");
+        brokerSecretKeyFile = File.createTempFile("saslRoleTokenSignerSecret", 
".key");
+        Files.write(Paths.get(brokerSecretKeyFile.toString()), 
"PulsarSecret".getBytes());
+        
conf.setSaslJaasServerRoleTokenSignerSecretPath(brokerSecretKeyFile.toString());
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderSasl.class.getName());
+        conf.setAuthenticationProviders(providers);
+        conf.setClusterName("test");
+        conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" 
+ kdc.getRealm()));
+        // set admin auth, to verify admin web resources
+        Map<String, String> clientSaslConfig = new HashMap<>();
+        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        clientSaslConfig.put("serverType", "broker");
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+        conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
+                
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
+
+        super.init();
+
+        lookupUrl = new URI(pulsar.getBrokerServiceUrl());
+        log.info("set client jaas section name: PulsarClient");
+        closeAdmin();
+        admin = PulsarAdmin.builder()
+            .serviceHttpUrl(brokerUrl.toString())
+            
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(),
 clientSaslConfig))
+            .build();
+        super.producerBaseSetup();
+        log.info("-- {} --, end.", methodName);
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        FileUtils.deleteQuietly(brokerSecretKeyFile);
+        Assert.assertFalse(brokerSecretKeyFile.exists());
+        FileUtils.deleteQuietly(proxySecretKeyFile);
+        Assert.assertFalse(proxySecretKeyFile.exists());
+        super.internalCleanup();
+    }
+
+    @Test
+    void testAuthentication() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // Step 1: Create Admin Client
+
+        // create a client which connects to proxy and pass authData
+        String topicName = "persistent://my-property/my-ns/my-topic1";
+
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+        proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
+        proxyConfig.setClusterName(configClusterName);
+
+        // proxy connect to broker
+        
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+            "{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
+                "\"serverType\": " + "\"broker\"}");
+        proxySecretKeyFile = File.createTempFile("saslRoleTokenSignerSecret", 
".key");
+        Files.write(Paths.get(proxySecretKeyFile.toString()), 
"PulsarSecret".getBytes());
+        
proxyConfig.setSaslJaasServerRoleTokenSignerSecretPath(proxySecretKeyFile.toString());
+        // proxy as a server, it will use sasl to authn
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderSasl.class.getName());
+        proxyConfig.setAuthenticationProviders(providers);
+
+        proxyConfig.setForwardAuthorizationCredentials(true);
+        @Cleanup
+        AuthenticationService authenticationService = new 
AuthenticationService(
                         PulsarConfigurationLoader.convertFrom(proxyConfig));
-               @Cleanup
-               final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
-                               
proxyConfig.getBrokerClientAuthenticationParameters());
-               proxyClientAuthentication.start();
-               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
-
-               proxyService.start();
-               final String proxyServiceUrl = "pulsar://localhost:" + 
proxyService.getListenPort().get();
-               log.info("1 proxy service started {}", proxyService);
-
-               // Step 3: Pass correct client params
-               @Cleanup
-               PulsarClient proxyClient = createProxyClient(proxyServiceUrl, 
1);
-               log.info("2 create proxy client {}, {}", proxyServiceUrl, 
proxyClient);
-
-               Producer<byte[]> producer = 
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
-               log.info("3 created producer.");
-
-               Consumer<byte[]> consumer = 
proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
-               log.info("4 created consumer.");
-
-               for (int i = 0; i < 10; i++) {
-                       String message = "my-message-" + i;
-                       producer.send(message.getBytes());
-                       log.info("Produced message: [{}]", message);
-               }
-
-               Message<byte[]> msg = null;
-               Set<String> messageSet = new HashSet<>();
-               for (int i = 0; i < 10; i++) {
-                       msg = consumer.receive(5, TimeUnit.SECONDS);
-                       String receivedMessage = new String(msg.getData());
-                       log.info("Received message: [{}]", receivedMessage);
-                       String expectedMessage = "my-message-" + i;
-                       testMessageOrderAndDuplicates(messageSet, 
receivedMessage, expectedMessage);
-               }
-               // Acknowledge the consumption of all messages at once
-               consumer.acknowledgeCumulative(msg);
-               consumer.close();
-
-               proxyService.close();
-       }
-
-       private PulsarClient createProxyClient(String proxyServiceUrl, int 
numberOfConnections) throws PulsarClientException {
-               Map<String, String> clientSaslConfig = new HashMap<>();
-               clientSaslConfig.put("saslJaasClientSectionName", 
"PulsarClient");
-               clientSaslConfig.put("serverType", "proxy");
-               log.info("set client jaas section name: PulsarClient, 
serverType: proxy");
-               Authentication authSasl = 
AuthenticationFactory.create(AuthenticationSasl.class.getName(), 
clientSaslConfig);
-
-               return PulsarClient.builder().serviceUrl(proxyServiceUrl)
-                               
.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
-       }
+        @Cleanup
+        final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
+
+        proxyService.start();
+        final String proxyServiceUrl = "pulsar://localhost:" + 
proxyService.getListenPort().get();
+        log.info("1 proxy service started {}", proxyService);
+
+        // Step 3: Pass correct client params
+        @Cleanup
+        PulsarClient proxyClient = createProxyClient(proxyServiceUrl, 1);
+        log.info("2 create proxy client {}, {}", proxyServiceUrl, proxyClient);
+
+        Producer<byte[]> producer = 
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+        log.info("3 created producer.");
+
+        Consumer<byte[]> consumer = 
proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
+        log.info("4 created consumer.");
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+            log.info("Produced message: [{}]", message);
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.info("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        proxyService.close();
+    }
+
+    private PulsarClient createProxyClient(String proxyServiceUrl, int 
numberOfConnections) throws PulsarClientException {
+        Map<String, String> clientSaslConfig = new HashMap<>();
+        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        clientSaslConfig.put("serverType", "proxy");
+        log.info("set client jaas section name: PulsarClient, serverType: 
proxy");
+        Authentication authSasl = 
AuthenticationFactory.create(AuthenticationSasl.class.getName(), 
clientSaslConfig);
+
+        return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+                
.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
+    }
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 96e4d0ed264..3ca24a680bb 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -247,6 +247,7 @@ public class BookieRackAffinityMappingTest {
         ClientConfiguration bkClientConf = new ClientConfiguration();
         
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, 
store);
 
+        @Cleanup
         PulsarRegistrationClient pulsarRegistrationClient = new 
PulsarRegistrationClient(store, "/ledgers");
         DefaultBookieAddressResolver defaultBookieAddressResolver = new 
DefaultBookieAddressResolver(pulsarRegistrationClient);
 
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
index 5cbd0245565..fc0c900562f 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java
@@ -22,10 +22,10 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
+import lombok.Cleanup;
 import org.testng.annotations.Test;
 
 public class RateLimiterTest {
@@ -218,6 +218,7 @@ public class RateLimiterTest {
         long rateTime = 1;
         long newUpdatedRateLimit = 100L;
         Supplier<Long> permitUpdater = () -> newUpdatedRateLimit;
+        @Cleanup
         RateLimiter limiter = 
RateLimiter.builder().permits(permits).rateTime(1).timeUnit(TimeUnit.SECONDS)
                 .permitUpdater(permitUpdater)
                 .build();
@@ -233,6 +234,7 @@ public class RateLimiterTest {
         long rateTime = 1;
         int reNewTime = 3;
         RateLimitFunction rateLimitFunction = atomicInteger::incrementAndGet;
+        @Cleanup
         RateLimiter rateLimiter = 
RateLimiter.builder().permits(permits).rateTime(rateTime).timeUnit(TimeUnit.SECONDS)
                 .rateLimitFunction(rateLimitFunction)
                 .build();
diff --git 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 91e7e902eab..abdd9d4b111 100644
--- 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -164,7 +164,7 @@ public class FileStoreBackedReadHandleImpl implements 
ReadHandle {
                                 nextExpectedId, entryId, lastEntry);
                         throw new BKException.BKUnexpectedConditionException();
                     }
-            }
+                }
                 promise.complete(LedgerEntriesImpl.create(entries));
             } catch (Throwable t) {
                 this.offloaderStats.recordReadOffloadError(topicName);
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 9609362e9b7..b70a4623d5d 100644
--- 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -84,11 +84,15 @@ public abstract class FileStoreTestBase {
     }
 
     @AfterMethod(alwaysRun = true)
-    public void tearDown() {
+    public void tearDown() throws Exception {
         if (fileSystemManagedLedgerOffloader != null) {
             fileSystemManagedLedgerOffloader.close();
             fileSystemManagedLedgerOffloader = null;
         }
+        if (offloaderStats != null) {
+            offloaderStats.close();
+            offloaderStats = null;
+        }
         if (hdfsCluster != null) {
             hdfsCluster.shutdown(true, true);
             hdfsCluster = null;
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index 71fe5ec7219..0a7d6ab1c65 100644
--- 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
@@ -98,7 +99,7 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
 
     @AfterMethod(alwaysRun = true)
     @Override
-    public void tearDown() {
+    public void tearDown() throws Exception {
         super.tearDown();
     }
 
@@ -122,6 +123,8 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
             assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
             assertEquals(toWriteEntry.getEntryBuffer(), 
toTestEntry.getEntryBuffer());
         }
+        toTestEntries.close();
+        toWriteEntries.close();
         toTestEntries = toTest.read(1, numberOfEntries - 1);
         toWriteEntries = toWrite.read(1,numberOfEntries - 1);
         toTestIter = toTestEntries.iterator();
@@ -135,6 +138,8 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
             assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
             assertEquals(toWriteEntry.getEntryBuffer(), 
toTestEntry.getEntryBuffer());
         }
+        toTestEntries.close();
+        toWriteEntries.close();
     }
 
     @Test
@@ -155,6 +160,7 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
         while (toTestIter.hasNext()) {
             LedgerEntry toTestEntry = toTestIter.next();
         }
+        toTestEntries.close();
 
         assertTrue(offloaderStats.getReadOffloadError(topicName) == 0);
         assertTrue(offloaderStats.getReadOffloadBytes(topicName) > 0);
@@ -168,10 +174,11 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, map).get();
         Configuration configuration = new Configuration();
+        @Cleanup
         FileSystem fileSystem = FileSystem.get(new URI(getURI()), 
configuration);
         assertTrue(fileSystem.exists(new Path(createDataFilePath(storagePath, 
lh.getId(), uuid))));
         assertTrue(fileSystem.exists(new Path(createIndexFilePath(storagePath, 
lh.getId(), uuid))));
-        offloader.deleteOffloaded(lh.getId(), uuid, map);
+        offloader.deleteOffloaded(lh.getId(), uuid, map).get();
         assertFalse(fileSystem.exists(new Path(createDataFilePath(storagePath, 
lh.getId(), uuid))));
         assertFalse(fileSystem.exists(new 
Path(createIndexFilePath(storagePath, lh.getId(), uuid))));
     }
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
index 14734b3faca..7653a3cf152 100644
--- 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.offload.filesystem.impl;
 
 import static org.testng.Assert.assertEquals;
-
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -27,6 +26,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
@@ -38,12 +38,29 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class FileSystemOffloaderLocalFileTest {
-    private OrderedScheduler scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
-    private LedgerOffloaderStats offloaderStats = 
LedgerOffloaderStats.create(true, true, scheduler, 60);
+    private OrderedScheduler scheduler;
+    private LedgerOffloaderStats offloaderStats;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+        offloaderStats = LedgerOffloaderStats.create(true, true, scheduler, 
60);
+    }
 
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        if (scheduler != null) {
+            scheduler.shutdown();
+        }
+        if (offloaderStats != null) {
+            offloaderStats.close();
+        }
+    }
 
     private String getResourceFilePath(String name) {
         return getClass().getClassLoader().getResource(name).getPath();
@@ -59,11 +76,13 @@ public class FileSystemOffloaderLocalFileTest {
         
offloadPolicies.setFileSystemProfilePath(getResourceFilePath("filesystem_offload_core_site.xml"));
 
         // initialize the offloader with the offload policies
+        @Cleanup
         var offloader = 
FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, 
offloaderStats);
 
         int numberOfEntries = 100;
 
         // prepare the data in bookkeeper
+        @Cleanup
         BookKeeper bk = new PulsarMockBookKeeper(scheduler);
         LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, 
"".getBytes());
         for (int i = 0; i <  numberOfEntries; i++) {
@@ -72,6 +91,7 @@ public class FileSystemOffloaderLocalFileTest {
         }
         lh.close();
 
+        @Cleanup
         ReadHandle read = bk.newOpenLedgerOp()
             .withLedgerId(lh.getId())
             .withDigestType(DigestType.CRC32)
@@ -83,6 +103,7 @@ public class FileSystemOffloaderLocalFileTest {
 
         UUID uuid = UUID.randomUUID();
         offloader.offload(read, uuid, offloadDriverMetadata).get();
+        @Cleanup
         ReadHandle toTest = offloader.readOffloaded(read.getId(), uuid, 
offloadDriverMetadata).get();
         assertEquals(toTest.getLastAddConfirmed(), read.getLastAddConfirmed());
         LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
@@ -98,6 +119,9 @@ public class FileSystemOffloaderLocalFileTest {
             assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
             assertEquals(toWriteEntry.getEntryBuffer(), 
toTestEntry.getEntryBuffer());
         }
+        toTestEntries.close();
+        toWriteEntries.close();
+
         toTestEntries = toTest.read(1, numberOfEntries - 1);
         toWriteEntries = read.read(1,numberOfEntries - 1);
         toTestIter = toTestEntries.iterator();
@@ -112,6 +136,9 @@ public class FileSystemOffloaderLocalFileTest {
             assertEquals(toWriteEntry.getEntryBuffer(), 
toTestEntry.getEntryBuffer());
         }
 
+        toTestEntries.close();
+        toWriteEntries.close();
+
         // check the file located in the local file system
         Path offloadedFilePath = Paths.get(basePath, mlName);
         assertEquals(Files.exists(offloadedFilePath), true);


Reply via email to