This is an automated email from the ASF dual-hosted git repository. kwang pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a9e58b1851530a5f2d504936f07e2a17850d2962 Author: Heesung Sohn <[email protected]> AuthorDate: Wed Oct 11 09:22:44 2023 -0700 [improve][broker] use ConcurrentHashMap in ServiceUnitStateChannel and avoid recursive update error (#21282) (cherry picked from commit aecdb03e0e64605d60f03d9b76f99c1136677dff) --- .../channel/ServiceUnitStateChannelImpl.java | 90 +++++++++++++--------- .../channel/ServiceUnitStateChannelTest.java | 10 +-- 2 files changed, 59 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index d71513652e9..f7e09a2bec5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -54,6 +54,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -67,6 +68,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.PulsarClusterMetadataSetup; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -97,7 +99,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.SessionEvent; @@ -125,9 +126,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private final PulsarService pulsar; private final ServiceConfiguration config; private final Schema<ServiceUnitStateData> schema; - private final ConcurrentOpenHashMap<String, CompletableFuture<String>> getOwnerRequests; + private final Map<String, CompletableFuture<String>> getOwnerRequests; private final String lookupServiceAddress; - private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> cleanupJobs; + private final Map<String, CompletableFuture<Void>> cleanupJobs; private final StateChangeListeners stateChangeListeners; private ExtensibleLoadManagerImpl loadManager; private BrokerRegistry brokerRegistry; @@ -204,9 +205,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { this.config = pulsar.getConfig(); this.lookupServiceAddress = pulsar.getLookupServiceAddress(); this.schema = Schema.JSON(ServiceUnitStateData.class); - this.getOwnerRequests = ConcurrentOpenHashMap.<String, - CompletableFuture<String>>newBuilder().build(); - this.cleanupJobs = ConcurrentOpenHashMap.<String, CompletableFuture<Void>>newBuilder().build(); + this.getOwnerRequests = new ConcurrentHashMap<>(); + this.cleanupJobs = new ConcurrentHashMap<>(); this.stateChangeListeners = new StateChangeListeners(); this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds() * 1000; @@ -826,20 +826,28 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) { - return getOwnerRequests - .computeIfAbsent(serviceUnit, k -> { - CompletableFuture<String> future = new CompletableFuture<>(); - future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS) - .whenComplete((v, e) -> { - if (e != null) { - getOwnerRequests.remove(serviceUnit, future); - log.warn("Failed to getOwner for serviceUnit:{}", - serviceUnit, e); - } + var requested = new MutableObject<CompletableFuture<String>>(); + try { + return getOwnerRequests + .computeIfAbsent(serviceUnit, k -> { + CompletableFuture<String> future = new CompletableFuture<>(); + requested.setValue(future); + return future; + }); + } finally { + var future = requested.getValue(); + if (future != null) { + future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS) + .whenComplete((v, e) -> { + if (e != null) { + getOwnerRequests.remove(serviceUnit, future); + log.warn("Failed to getOwner for serviceUnit:{}", + serviceUnit, e); } - ); - return future; - }); + } + ); + } + } } private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) { @@ -1114,24 +1122,34 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } private void scheduleCleanup(String broker, long delayInSecs) { - cleanupJobs.computeIfAbsent(broker, k -> { - Executor delayed = CompletableFuture - .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); - totalInactiveBrokerCleanupScheduledCnt++; - return CompletableFuture - .runAsync(() -> { - try { - doCleanup(broker); - } catch (Throwable e) { - log.error("Failed to run the cleanup job for the broker {}, " - + "totalCleanupErrorCnt:{}.", - broker, totalCleanupErrorCnt.incrementAndGet(), e); - } finally { - cleanupJobs.remove(broker); + var scheduled = new MutableObject<CompletableFuture<Void>>(); + try { + cleanupJobs.computeIfAbsent(broker, k -> { + Executor delayed = CompletableFuture + .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); + totalInactiveBrokerCleanupScheduledCnt++; + var future = CompletableFuture + .runAsync(() -> { + try { + doCleanup(broker); + } catch (Throwable e) { + log.error("Failed to run the cleanup job for the broker {}, " + + "totalCleanupErrorCnt:{}.", + broker, totalCleanupErrorCnt.incrementAndGet(), e); + } } - } - , delayed); - }); + , delayed); + scheduled.setValue(future); + return future; + }); + } finally { + var future = scheduled.getValue(); + if (future != null) { + future.whenComplete((v, ex) -> { + cleanupJobs.remove(broker); + }); + } + } log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.", broker, delayInSecs, cleanupJobs.size()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index a226df53e12..f9893ea3f63 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -60,6 +60,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -88,7 +89,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.policies.data.TopicType; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; @@ -1558,9 +1558,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { } - private static ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests( + private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException { - return (ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>>) + return (ConcurrentHashMap<String, CompletableFuture<Optional<String>>>) FieldUtils.readDeclaredField(channel, "getOwnerRequests", true); } @@ -1577,9 +1577,9 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { FieldUtils.readField(channel, "lastMetadataSessionEventTimestamp", true); } - private static ConcurrentOpenHashMap<String, CompletableFuture<Void>> getCleanupJobs( + private static ConcurrentHashMap<String, CompletableFuture<Void>> getCleanupJobs( ServiceUnitStateChannel channel) throws IllegalAccessException { - return (ConcurrentOpenHashMap<String, CompletableFuture<Void>>) + return (ConcurrentHashMap<String, CompletableFuture<Void>>) FieldUtils.readField(channel, "cleanupJobs", true); }
