This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9c75904ad92 [fix][test] Fix CoordinatorService, MetadataStore and
MockZooKeeper leaks in tests (#15638)
9c75904ad92 is described below
commit 9c75904ad92292da237efcf0b7ecd1f30a52dad3
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Jul 13 10:40:34 2022 +0200
[fix][test] Fix CoordinatorService, MetadataStore and MockZooKeeper leaks
in tests (#15638)
* [fix][tests] Ensure thread pools are disposed after some tests
* fix comments
---
.../bookkeeper/test/MockedBookKeeperTestCase.java | 6 +++++-
.../org/apache/pulsar/broker/PulsarService.java | 11 ++++++++--
.../pulsar/broker/service/BrokerService.java | 1 +
.../broker/namespace/OwnershipCacheTest.java | 1 +
.../GracefulExecutorServicesShutdownTest.java | 1 +
.../pulsar/broker/service/PersistentTopicTest.java | 25 ++++++++++------------
.../pulsar/broker/service/ServerCnxTest.java | 14 +++++++-----
.../persistent/PersistentSubscriptionTest.java | 22 +++++++++----------
...MultiListenersWithInternalListenerNameTest.java | 15 +++++++------
.../impl/AcknowledgementsGroupingTrackerTest.java | 6 +++---
.../util}/GracefulExecutorServicesShutdown.java | 2 +-
...GracefulExecutorServicesTerminationHandler.java | 3 +--
.../coordination/impl/CoordinationServiceImpl.java | 6 ++++++
.../java/org/apache/zookeeper/MockZooKeeper.java | 1 +
14 files changed, 68 insertions(+), 46 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 549af5eb248..0fd8902f825 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -87,6 +89,7 @@ public abstract class MockedBookKeeperTestCase {
}
@AfterMethod(alwaysRun = true)
+ @SneakyThrows
public final void tearDown(Method method) {
try {
cleanUpTestCase();
@@ -95,9 +98,10 @@ public abstract class MockedBookKeeperTestCase {
}
try {
LOG.info("@@@@@@@@@ stopping " + method);
- factory.shutdown();
+ factory.shutdownAsync().get(10, TimeUnit.SECONDS);
factory = null;
stopBookKeeper();
+ metadataStore.close();
LOG.info("--------- stopped {}", method);
} catch (Exception e) {
LOG.error("tearDown Error", e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f85afe0cc72..ea7b4481b23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -101,7 +101,6 @@ import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import
org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
@@ -143,6 +142,7 @@ import
org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.ThreadDumpUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
@@ -446,6 +446,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
* getConfiguration()
.getBrokerShutdownTimeoutMs())));
+
List<CompletableFuture<Void>> asyncCloseFutures = new
ArrayList<>();
if (this.brokerService != null) {
CompletableFuture<Void> brokerCloseFuture =
this.brokerService.closeAsync();
@@ -549,7 +550,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedScheduledExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
- ioEventLoopGroup.shutdownGracefully();
+
+
asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
+
// add timeout handling for closing executors
asyncCloseFutures.add(executorServicesShutdown.handle());
@@ -597,6 +600,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
Duration.ofMillis(Math.max(1L,
getConfiguration().getBrokerShutdownTimeoutMs())),
shutdownExecutor, () ->
FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
future.handle((v, t) -> {
+ if (t != null) {
+ LOG.info("Shutdown timed out after {} ms",
getConfiguration().getBrokerShutdownTimeoutMs());
+ LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());
+ }
// shutdown the shutdown executor
shutdownExecutor.shutdownNow();
return null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index abeec85eb84..1cadb20ca1e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -159,6 +159,7 @@ import
org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.RateLimiter;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index dde25fa2eed..6168c61bb20 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -115,6 +115,7 @@ public class OwnershipCacheTest {
@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
executor.shutdownNow();
+ coordinationService.close();
store.close();
otherStore.close();
zookeeperServer.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
index de97bd73639..cd10950b230 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.testng.annotations.Test;
public class GracefulExecutorServicesShutdownTest {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 76fdc4ff5ac..840810ecc1b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -57,6 +57,7 @@ import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -134,7 +135,9 @@ import
org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
@@ -249,20 +252,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
- metadataStore.close();
- brokerService.getTopics().clear();
- brokerService.close(); //to clear pulsarStats
- try {
- pulsar.close();
- } catch (Exception e) {
- log.warn("Failed to close pulsar service", e);
- throw e;
- }
-
- executor.shutdownNow();
- if (eventLoopGroup != null) {
- eventLoopGroup.shutdownGracefully().get();
- }
+ brokerService.close();
+ pulsar.close();
+ GracefulExecutorServicesShutdown.initiate()
+ .timeout(Duration.ZERO)
+ .shutdown(executor)
+ .handle().get();
+ EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
+ store.close();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 4a4f411ddb8..1a57310d01e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -44,6 +44,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -117,6 +118,8 @@ import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
@@ -238,12 +241,13 @@ public class ServerCnxTest {
if (channel != null) {
channel.close();
}
- pulsar.close();
brokerService.close();
- executor.shutdownNow();
- if (eventLoopGroup != null) {
- eventLoopGroup.shutdownGracefully().get();
- }
+ pulsar.close();
+ GracefulExecutorServicesShutdown.initiate()
+ .timeout(Duration.ZERO)
+ .shutdown(executor)
+ .handle().get();
+ EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
store.close();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 2d1e72d45c8..9c842b597f9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -33,6 +33,7 @@ import static org.testng.Assert.fail;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -68,6 +69,8 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -208,19 +211,14 @@ public class PersistentSubscriptionTest {
@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
- brokerMock.close(); //to clear pulsarStats
- try {
- pulsarMock.close();
- } catch (Exception e) {
- log.warn("Failed to close pulsar service", e);
- throw e;
- }
-
+ brokerMock.close();
+ pulsarMock.close();
+ GracefulExecutorServicesShutdown.initiate()
+ .timeout(Duration.ZERO)
+ .shutdown(executor)
+ .handle().get();
+ EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
store.close();
- executor.shutdownNow();
- if (eventLoopGroup != null) {
- eventLoopGroup.shutdownGracefully().get();
- }
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
index a790aa7c7c7..35e8939e531 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -23,6 +23,7 @@ import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -46,6 +47,8 @@ import
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -190,12 +193,12 @@ public class
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
- if (this.executorService != null) {
- this.executorService.shutdownNow();
- }
- if (eventExecutors != null) {
- eventExecutors.shutdownGracefully();
- }
+ pulsar.close();
+ GracefulExecutorServicesShutdown.initiate()
+ .timeout(Duration.ZERO)
+ .shutdown(executorService)
+ .handle().get();
+ EventLoopUtil.shutdownGracefully(eventExecutors).get();
super.internalCleanup();
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index d577f48357c..95b5eb81c33 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -62,7 +62,7 @@ public class AcknowledgementsGroupingTrackerTest {
consumer = mock(ConsumerImpl.class);
consumer.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdImpl,
MessageIdImpl[]>newBuilder().build();
- cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new
NioEventLoopGroup()));
+ cnx = spy(new ClientCnxTest(new ClientConfigurationData(),
eventLoopGroup));
PulsarClientImpl client = mock(PulsarClientImpl.class);
doReturn(client).when(consumer).getClient();
doReturn(cnx).when(consumer).getClientCnx();
@@ -79,8 +79,8 @@ public class AcknowledgementsGroupingTrackerTest {
}
@AfterClass(alwaysRun = true)
- public void teardown() {
- eventLoopGroup.shutdownGracefully();
+ public void teardown() throws Exception {
+ eventLoopGroup.shutdownGracefully().get();
}
@Test(dataProvider = "isNeedReceipt")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesShutdown.java
similarity index 99%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java
rename to
pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesShutdown.java
index 1c7134ffca0..9784c0f6a43 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesShutdown.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.common.util;
import java.time.Duration;
import java.util.ArrayList;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
similarity index 98%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java
rename to
pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
index 7f012e10225..148b826e446 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.common.util;
import java.time.Duration;
import java.util.ArrayList;
@@ -27,7 +27,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.util.FutureUtil;
/**
* Waits for termination of {@link ExecutorService}s that have been shutdown.
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
index c8ce40e37c7..64ba31bd857 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
@@ -68,6 +69,10 @@ public class CoordinationServiceImpl implements
CoordinationService {
public void close() throws Exception {
try {
List<CompletableFuture<Void>> futures = new ArrayList<>();
+ futures.add(GracefulExecutorServicesShutdown
+ .initiate()
+ .shutdown(executor)
+ .handle());
for (LeaderElection<?> le : leaderElections.values()) {
futures.add(le.asyncClose());
@@ -77,6 +82,7 @@ public class CoordinationServiceImpl implements
CoordinationService {
futures.add(lm.asyncClose());
}
+
FutureUtils.collect(futures).get(CLOSE_TIMEOUT.toMillis(),
TimeUnit.MILLISECONDS);
} catch (CompletionException ce) {
throw MetadataStoreException.unwrap(ce);
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index b9325c16102..39f3845c967 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -1090,6 +1090,7 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void close() throws InterruptedException {
+ shutdown();
}
public void shutdown() throws InterruptedException {