This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 774e22c2f4726709f2cb0246054baae5a8218d59 Author: Lari Hotari <[email protected]> AuthorDate: Tue May 31 17:12:54 2022 +0300 [Broker] Add timeout to closing CoordinationServiceImpl (#15777) Fixes #15774 Also close the executor that wasn't closed (cherry picked from commit 1266f913678804d4cb7f2142458e87d6155f8bd4) --- .../pulsar/metadata/coordination/impl/CoordinationServiceImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java index 4bda85c64fb..2b7e38b6c44 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.metadata.coordination.impl; import io.netty.util.concurrent.DefaultThreadFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -29,6 +30,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.metadata.api.MetadataSerde; @@ -43,6 +45,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @SuppressWarnings("unchecked") public class CoordinationServiceImpl implements CoordinationService { + private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10); private final MetadataStoreExtended store; private final Map<Object, LockManager<?>> lockManagers = new ConcurrentHashMap<>(); @@ -69,9 +72,11 @@ public class CoordinationServiceImpl implements CoordinationService { futures.add(lm.asyncClose()); } - FutureUtils.collect(futures).join(); + FutureUtils.collect(futures).get(CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); } catch (CompletionException ce) { throw MetadataStoreException.unwrap(ce); + } finally { + executor.shutdownNow(); } }
