This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ae0fc5bdcae [fix][test] Fix resource leak in PulsarTestContext (#20799)
ae0fc5bdcae is described below

commit ae0fc5bdcae0220da4936d0fdaff71eead219cdd
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jul 14 00:01:22 2023 +0300

    [fix][test] Fix resource leak in PulsarTestContext (#20799)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  6 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  5 +-
 .../testcontext/AbstractTestPulsarService.java     | 53 ++++++++-------
 .../testcontext/NonStartableTestPulsarService.java | 79 ++++------------------
 4 files changed, 49 insertions(+), 94 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5cf3c47bcb0..40c5a2d6528 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -784,7 +784,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                     exposeTopicMetrics, offloaderScheduler, interval);
             this.defaultOffloader = 
createManagedLedgerOffloader(defaultOffloadPolicies);
 
-            this.brokerInterceptor = BrokerInterceptors.load(config);
+            setBrokerInterceptor(newBrokerInterceptor());
             // use getter to support mocking getBrokerInterceptor method in 
tests
             BrokerInterceptor interceptor = getBrokerInterceptor();
             if (interceptor != null) {
@@ -927,6 +927,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         }
     }
 
+    protected BrokerInterceptor newBrokerInterceptor() throws IOException {
+        return BrokerInterceptors.load(config);
+    }
+
     @VisibleForTesting
     protected OrderedExecutor newOrderedExecutor() {
         return OrderedExecutor.newBuilder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index fefed1aaa0a..c49df3e85ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -131,6 +131,7 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
@@ -172,11 +173,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         svcConfig.setClusterName("pulsar-cluster");
         svcConfig.setTopicLevelPoliciesEnabled(false);
         svcConfig.setSystemTopicEnabled(false);
+        Compactor compactor = mock(Compactor.class);
+        when(compactor.getStats()).thenReturn(mock(CompactorMXBean.class));
         pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
                 .config(svcConfig)
                 .spyByDefault()
                 .useTestPulsarResources(metadataStore)
-                .compactor(mock(Compactor.class))
+                .compactor(compactor)
                 .build();
         brokerService = pulsarTestContext.getBrokerService();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index a6861268b94..517d57d0042 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.testcontext;
 
+import java.io.IOException;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -37,11 +38,7 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  */
 abstract class AbstractTestPulsarService extends PulsarService {
     protected final SpyConfig spyConfig;
-    protected final MetadataStoreExtended localMetadataStore;
-    protected final MetadataStoreExtended configurationMetadataStore;
-    protected final Compactor compactor;
-    protected final BrokerInterceptor brokerInterceptor;
-    protected final BookKeeperClientFactory bookKeeperClientFactory;
+    private boolean compactorExists;
 
     public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration 
config,
                                      MetadataStoreExtended localMetadataStore,
@@ -50,53 +47,59 @@ abstract class AbstractTestPulsarService extends 
PulsarService {
                                      BookKeeperClientFactory 
bookKeeperClientFactory) {
         super(config);
         this.spyConfig = spyConfig;
-        this.localMetadataStore =
-                
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, 
MetadataStoreExtended.class);
-        this.configurationMetadataStore =
-                
NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore, 
MetadataStoreExtended.class);
-        this.compactor = compactor;
-        this.brokerInterceptor = brokerInterceptor;
-        this.bookKeeperClientFactory = bookKeeperClientFactory;
+        setLocalMetadataStore(
+                
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, 
MetadataStoreExtended.class));
+        setConfigurationMetadataStore(
+                
NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore, 
MetadataStoreExtended.class));
+        setCompactor(compactor);
+        setBrokerInterceptor(brokerInterceptor);
+        setBkClientFactory(bookKeeperClientFactory);
     }
 
     @Override
     public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
             throws MetadataStoreException {
         if (synchronizer != null) {
-            
synchronizer.registerSyncListener(configurationMetadataStore::handleMetadataEvent);
+            synchronizer.registerSyncListener(
+                    ((MetadataStoreExtended) 
getConfigurationMetadataStore())::handleMetadataEvent);
         }
-        return configurationMetadataStore;
+        return getConfigurationMetadataStore();
     }
 
     @Override
     public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
             throws MetadataStoreException, PulsarServerException {
         if (synchronizer != null) {
-            
synchronizer.registerSyncListener(localMetadataStore::handleMetadataEvent);
+            synchronizer.registerSyncListener(
+                    getLocalMetadataStore()::handleMetadataEvent);
         }
-        return localMetadataStore;
+        return getLocalMetadataStore();
     }
 
     @Override
-    public Compactor newCompactor() throws PulsarServerException {
+    protected void setCompactor(Compactor compactor) {
         if (compactor != null) {
-            return compactor;
-        } else {
-            return spyConfig.getCompactor().spy(super.newCompactor());
+            compactorExists = true;
         }
+        super.setCompactor(compactor);
     }
 
     @Override
-    public BrokerInterceptor getBrokerInterceptor() {
-        if (brokerInterceptor != null) {
-            return brokerInterceptor;
+    public Compactor newCompactor() throws PulsarServerException {
+        if (compactorExists) {
+            return getCompactor();
         } else {
-            return super.getBrokerInterceptor();
+            return spyConfig.getCompactor().spy(super.newCompactor());
         }
     }
 
     @Override
     public BookKeeperClientFactory newBookKeeperClientFactory() {
-        return bookKeeperClientFactory;
+        return getBkClientFactory();
+    }
+
+    @Override
+    protected BrokerInterceptor newBrokerInterceptor() throws IOException {
+        return getBrokerInterceptor() != null ? getBrokerInterceptor() : 
super.newBrokerInterceptor();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 13c4d7d72af..af365ed3193 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -38,11 +38,9 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -56,13 +54,6 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  * for a "non-startable" PulsarService. Please see {@link PulsarTestContext} 
for more details.
  */
 class NonStartableTestPulsarService extends AbstractTestPulsarService {
-    private final PulsarResources pulsarResources;
-    private final ManagedLedgerStorage managedLedgerClientFactory;
-    private final BrokerService brokerService;
-
-    private final SchemaRegistryService schemaRegistryService;
-
-    private final PulsarClientImpl pulsarClient;
 
     private final NamespaceService namespaceService;
 
@@ -76,16 +67,16 @@ class NonStartableTestPulsarService extends 
AbstractTestPulsarService {
                                          Function<BrokerService, 
BrokerService> brokerServiceCustomizer) {
         super(spyConfig, config, localMetadataStore, 
configurationMetadataStore, compactor, brokerInterceptor,
                 bookKeeperClientFactory);
-        this.pulsarResources = pulsarResources;
-        this.managedLedgerClientFactory = managedLedgerClientFactory;
+        setPulsarResources(pulsarResources);
+        setManagedLedgerClientFactory(managedLedgerClientFactory);
         try {
-            this.brokerService = brokerServiceCustomizer.apply(
-                    spyConfig.getBrokerService().spy(TestBrokerService.class, 
this, getIoEventLoopGroup()));
+            setBrokerService(brokerServiceCustomizer.apply(
+                    spyConfig.getBrokerService().spy(TestBrokerService.class, 
this, getIoEventLoopGroup())));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        this.schemaRegistryService = 
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
-        this.pulsarClient = mock(PulsarClientImpl.class);
+        
setSchemaRegistryService(spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class));
+        setClient(mock(PulsarClientImpl.class));
         this.namespaceService = mock(NamespaceService.class);
         try {
             startNamespaceService();
@@ -118,63 +109,17 @@ class NonStartableTestPulsarService extends 
AbstractTestPulsarService {
         return () -> namespaceService;
     }
 
-    @Override
-    public synchronized PulsarClient getClient() throws PulsarServerException {
-        return pulsarClient;
-    }
-
     @Override
     public PulsarClientImpl createClientImpl(ClientConfigurationData 
clientConf) throws PulsarClientException {
-        return pulsarClient;
-    }
-
-    @Override
-    public SchemaRegistryService getSchemaRegistryService() {
-        return schemaRegistryService;
-    }
-
-    @Override
-    public PulsarResources getPulsarResources() {
-        return pulsarResources;
-    }
-
-    public BrokerService getBrokerService() {
-        return brokerService;
-    }
-
-    @Override
-    public MetadataStore getConfigurationMetadataStore() {
-        return configurationMetadataStore;
-    }
-
-    @Override
-    public MetadataStoreExtended getLocalMetadataStore() {
-        return localMetadataStore;
-    }
-
-    @Override
-    public ManagedLedgerStorage getManagedLedgerClientFactory() {
-        return managedLedgerClientFactory;
-    }
-
-    @Override
-    protected PulsarResources newPulsarResources() {
-        return pulsarResources;
-    }
-
-    @Override
-    protected ManagedLedgerStorage newManagedLedgerClientFactory() throws 
Exception {
-        return managedLedgerClientFactory;
+        try {
+            return (PulsarClientImpl) getClient();
+        } catch (PulsarServerException e) {
+            throw new PulsarClientException(e);
+        }
     }
-
     @Override
     protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
-        return brokerService;
-    }
-
-    @Override
-    public BookKeeperClientFactory getBookKeeperClientFactory() {
-        return bookKeeperClientFactory;
+        return getBrokerService();
     }
 
     static class TestBrokerService extends BrokerService {

Reply via email to