This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 14d9b8492d6dc4781bc83f817ca7898966b4214d Author: LinChen <[email protected]> AuthorDate: Mon Mar 14 23:23:47 2022 +0800 support shrink for map or set (#14663) * support shrink for map or set * check style * check style (cherry picked from commit 1d10dff757ac7b9a203c14d2085a480495fb141b) --- .../mledger/impl/ManagedLedgerOfflineBacklog.java | 3 +- .../broker/loadbalance/impl/LoadManagerShared.java | 20 ++- .../loadbalance/impl/ModularLoadManagerImpl.java | 18 ++- .../loadbalance/impl/SimpleLoadManagerImpl.java | 18 ++- .../pulsar/broker/namespace/NamespaceService.java | 13 +- .../org/apache/pulsar/broker/rest/TopicsBase.java | 3 +- .../pulsar/broker/service/BrokerService.java | 42 ++++-- .../service/nonpersistent/NonPersistentTopic.java | 12 +- .../service/persistent/MessageDeduplication.java | 12 +- .../broker/service/persistent/PersistentTopic.java | 20 ++- .../broker/stats/ClusterReplicationMetrics.java | 3 +- .../AntiAffinityNamespaceGroupTest.java | 15 ++- .../loadbalance/impl/LoadManagerSharedTest.java | 13 +- .../pulsar/broker/service/PersistentTopicTest.java | 24 +++- .../apache/pulsar/client/impl/ConsumerBase.java | 3 +- .../apache/pulsar/client/impl/ConsumerImpl.java | 3 +- .../client/impl/PartitionedProducerImpl.java | 3 +- .../apache/pulsar/client/impl/ProducerBase.java | 3 +- .../impl/AcknowledgementsGroupingTrackerTest.java | 3 +- .../util/collections/ConcurrentLongPairSet.java | 148 +++++++++++++++++++-- .../util/collections/ConcurrentOpenHashMap.java | 140 +++++++++++++++++-- .../util/collections/ConcurrentOpenHashSet.java | 140 +++++++++++++++++-- .../collections/ConcurrentSortedLongPairSet.java | 5 +- .../collections/ConcurrentLongPairSetTest.java | 111 +++++++++++++--- .../collections/ConcurrentOpenHashMapTest.java | 125 ++++++++++++++--- .../collections/ConcurrentOpenHashSetTest.java | 73 +++++++++- .../pulsar/sql/presto/PulsarRecordCursor.java | 3 +- .../apache/pulsar/websocket/WebSocketService.java | 23 +++- .../apache/pulsar/websocket/stats/ProxyStats.java | 4 +- 29 files changed, 860 insertions(+), 143 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java index 99cc6c8842e..e0362205c6d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java @@ -220,7 +220,8 @@ public class ManagedLedgerOfflineBacklog { BookKeeper bk = factory.getBookKeeper(); final CountDownLatch allCursorsCounter = new CountDownLatch(1); final long errorInReadingCursor = -1; - ConcurrentOpenHashMap<String, Long> ledgerRetryMap = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, Long> ledgerRetryMap = + ConcurrentOpenHashMap.<String, Long>newBuilder().build(); final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue(); final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 0c8f8a00d1c..c0ee0d2f986 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -190,7 +190,9 @@ public class LoadManagerShared { bundles.forEach(bundleName -> { final String namespaceName = getNamespaceNameFromBundleName(bundleName); final String bundleRange = getBundleRangeFromBundleName(bundleName); - target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange); + target.computeIfAbsent(namespaceName, + k -> ConcurrentOpenHashSet.<String>newBuilder().build()) + .add(bundleRange); }); } @@ -263,8 +265,12 @@ public class LoadManagerShared { for (final String broker : candidates) { int bundles = (int) brokerToNamespaceToBundleRange - .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>()) - .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size(); + .computeIfAbsent(broker, + k -> ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<String>>newBuilder().build()) + .computeIfAbsent(namespaceName, + k -> ConcurrentOpenHashSet.<String>newBuilder().build()) + .size(); leastBundles = Math.min(leastBundles, bundles); if (leastBundles == 0) { break; @@ -276,8 +282,12 @@ public class LoadManagerShared { final int finalLeastBundles = leastBundles; candidates.removeIf( - broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>()) - .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles); + broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, + k -> ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<String>>newBuilder().build()) + .computeIfAbsent(namespaceName, + k -> ConcurrentOpenHashSet.<String>newBuilder().build()) + .size() > finalLeastBundles); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 326f6af4375..08620340497 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -204,7 +204,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager { */ public ModularLoadManagerImpl() { brokerCandidateCache = new HashSet<>(); - brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>(); + brokerToNamespaceToBundleRange = + ConcurrentOpenHashMap.<String, + ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder() + .build(); defaultStats = new NamespaceBundleStats(); filterPipeline = new ArrayList<>(); loadData = new LoadData(); @@ -567,7 +570,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager { brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats); final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange - .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>()); + .computeIfAbsent(broker, k -> + ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<String>>newBuilder() + .build()); synchronized (namespaceToBundleRange) { namespaceToBundleRange.clear(); LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange); @@ -850,9 +856,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager { final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange - .computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>()); + .computeIfAbsent(broker.get(), + k -> ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<String>>newBuilder() + .build()); synchronized (namespaceToBundleRange) { - namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()) + namespaceToBundleRange.computeIfAbsent(namespaceName, + k -> ConcurrentOpenHashSet.<String>newBuilder().build()) .add(bundleRange); } return broker; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 092fe2c852d..4f7e37ad344 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -202,7 +202,10 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification bundleLossesCache = new HashSet<>(); brokerCandidateCache = new HashSet<>(); availableBrokersCache = new HashSet<>(); - brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>(); + brokerToNamespaceToBundleRange = + ConcurrentOpenHashMap.<String, + ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder() + .build(); this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() { @Override public boolean isEnablePersistentTopics(String brokerUrl) { @@ -851,8 +854,12 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification // same broker. brokerToNamespaceToBundleRange .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""), - k -> new ConcurrentOpenHashMap<>()) - .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange); + k -> ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<String>>newBuilder() + .build()) + .computeIfAbsent(namespaceName, k -> + ConcurrentOpenHashSet.<String>newBuilder().build()) + .add(bundleRange); ranking.addPreAllocatedServiceUnit(serviceUnitId, quota); resourceUnitRankings.put(selectedRU, ranking); } @@ -1271,7 +1278,10 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles(); final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange - .computeIfAbsent(broker.replace("http://", ""), k -> new ConcurrentOpenHashMap<>()); + .computeIfAbsent(broker.replace("http://", ""), + k -> ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<String>>newBuilder() + .build()); namespaceToBundleRange.clear(); LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange); LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 5ae88adbaa1..98e65dc3e56 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -165,7 +165,8 @@ public class NamespaceService implements AutoCloseable { this.loadManager = pulsar.getLoadManager(); this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this); - this.namespaceClients = new ConcurrentOpenHashMap<>(); + this.namespaceClients = + ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build(); this.bundleOwnershipListeners = new CopyOnWriteArrayList<>(); this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class); this.localPoliciesCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalPolicies.class); @@ -355,9 +356,15 @@ public class NamespaceService implements AutoCloseable { } private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> - findingBundlesAuthoritative = new ConcurrentOpenHashMap<>(); + findingBundlesAuthoritative = + ConcurrentOpenHashMap.<NamespaceBundle, + CompletableFuture<Optional<LookupResult>>>newBuilder() + .build(); private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> - findingBundlesNotAuthoritative = new ConcurrentOpenHashMap<>(); + findingBundlesNotAuthoritative = + ConcurrentOpenHashMap.<NamespaceBundle, + CompletableFuture<Optional<LookupResult>>>newBuilder() + .build(); /** * Main internal method to lookup and setup ownership of service unit to a broker. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java index f89abf9bea3..770d77794d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java @@ -431,7 +431,8 @@ public class TopicsBase extends PersistentTopicsBase { partitionedTopicName, result.getLookupData()); } pulsar().getBrokerService().getOwningTopics().computeIfAbsent(partitionedTopicName - .getPartitionedTopicName(), (key) -> new ConcurrentOpenHashSet<Integer>()) + .getPartitionedTopicName(), + (key) -> ConcurrentOpenHashSet.<Integer>newBuilder().build()) .add(partitionedTopicName.getPartitionIndex()); completeLookup(Pair.of(Collections.emptyList(), false), redirectAddresses, future); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a388f13f2dc..b7931be9b4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -284,17 +284,28 @@ public class BrokerService implements Closeable { this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable(); this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); - this.topics = new ConcurrentOpenHashMap<>(); - this.replicationClients = new ConcurrentOpenHashMap<>(); - this.clusterAdmins = new ConcurrentOpenHashMap<>(); + this.topics = + ConcurrentOpenHashMap.<String, CompletableFuture<Optional<Topic>>>newBuilder() + .build(); + this.replicationClients = + ConcurrentOpenHashMap.<String, PulsarClient>newBuilder().build(); + this.clusterAdmins = + ConcurrentOpenHashMap.<String, PulsarAdmin>newBuilder().build(); this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds(); - this.configRegisteredListeners = new ConcurrentOpenHashMap<>(); + this.configRegisteredListeners = + ConcurrentOpenHashMap.<String, Consumer<?>>newBuilder().build(); this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue(); - this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>(); - this.owningTopics = new ConcurrentOpenHashMap<>(); + this.multiLayerTopicsMap = ConcurrentOpenHashMap.<String, + ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>>newBuilder() + .build(); + this.owningTopics = ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<Integer>>newBuilder() + .build(); this.pulsarStats = new PulsarStats(pulsar); - this.offlineTopicStatCache = new ConcurrentOpenHashMap<>(); + this.offlineTopicStatCache = + ConcurrentOpenHashMap.<TopicName, + PersistentOfflineTopicStats>newBuilder().build(); this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder() .numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic()) @@ -329,7 +340,8 @@ public class BrokerService implements Closeable { this.backlogQuotaChecker = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker")); this.authenticationService = new AuthenticationService(pulsar.getConfiguration()); - this.blockedDispatchers = new ConcurrentOpenHashSet<>(); + this.blockedDispatchers = + ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build(); // update dynamic configuration and register-listener updateConfigurationAndRegisterListeners(); this.lookupRequestSemaphore = new AtomicReference<Semaphore>( @@ -1595,8 +1607,12 @@ public class BrokerService implements Closeable { synchronized (multiLayerTopicsMap) { String serviceUnit = namespaceBundle.toString(); multiLayerTopicsMap // - .computeIfAbsent(topicName.getNamespace(), k -> new ConcurrentOpenHashMap<>()) // - .computeIfAbsent(serviceUnit, k -> new ConcurrentOpenHashMap<>()) // + .computeIfAbsent(topicName.getNamespace(), + k -> ConcurrentOpenHashMap.<String, + ConcurrentOpenHashMap<String, Topic>>newBuilder() + .build()) // + .computeIfAbsent(serviceUnit, + k -> ConcurrentOpenHashMap.<String, Topic>newBuilder().build()) // .put(topicName.toString(), topic); } } @@ -2413,7 +2429,8 @@ public class BrokerService implements Closeable { } private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() { - ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = + ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build(); for (Field field : ServiceConfiguration.class.getDeclaredFields()) { if (field != null && field.isAnnotationPresent(FieldContext.class)) { field.setAccessible(true); @@ -2426,7 +2443,8 @@ public class BrokerService implements Closeable { } private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() { - ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = + ConcurrentOpenHashMap.<String, Object>newBuilder().build(); for (Field field : ServiceConfiguration.class.getDeclaredFields()) { if (field != null && field.isAnnotationPresent(FieldContext.class)) { field.setAccessible(true); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index f58a4fd644f..3818b8abc2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -143,8 +143,16 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol public NonPersistentTopic(String topic, BrokerService brokerService) { super(topic, brokerService); - this.subscriptions = new ConcurrentOpenHashMap<>(16, 1); - this.replicators = new ConcurrentOpenHashMap<>(16, 1); + this.subscriptions = + ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); + this.replicators = + ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); this.isFenced = false; registerTopicPolicyListener(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 201e6129a48..90ee3b67e3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -97,12 +97,20 @@ public class MessageDeduplication { // Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before // the messages are persisted @VisibleForTesting - final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1); + final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = + ConcurrentOpenHashMap.<String, Long>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); // Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated // after the messages are persisted @VisibleForTesting - final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1); + final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = + ConcurrentOpenHashMap.<String, Long>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); // Number of persisted entries after which to store a snapshot of the sequence ids map private final int snapshotInterval; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3914278ae7f..5a970b48f58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -254,8 +254,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) { super(topic, brokerService); this.ledger = ledger; - this.subscriptions = new ConcurrentOpenHashMap<>(16, 1); - this.replicators = new ConcurrentOpenHashMap<>(16, 1); + this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); + this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold(); initializeRateLimiterIfNeeded(Optional.empty()); @@ -344,8 +350,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal super(topic, brokerService); this.ledger = ledger; this.messageDeduplication = messageDeduplication; - this.subscriptions = new ConcurrentOpenHashMap<>(16, 1); - this.replicators = new ConcurrentOpenHashMap<>(16, 1); + this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); + this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient()); this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java index 1086563085b..6718f074c67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java @@ -35,7 +35,8 @@ public class ClusterReplicationMetrics { public ClusterReplicationMetrics(String localCluster, boolean metricsEnabled) { metricsList = new ArrayList<>(); this.localCluster = localCluster; - metricsMap = new ConcurrentOpenHashMap<>(); + metricsMap = ConcurrentOpenHashMap.<String, ReplicationMetrics>newBuilder() + .build(); this.metricsEnabled = metricsEnabled; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index 1429c7376f4..9e81a3e1db9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -234,7 +234,8 @@ public class AntiAffinityNamespaceGroupTest { brokerToDomainMap.put("brokerName-3", "domain-1"); Set<String> candidate = Sets.newHashSet(); - ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = + ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build(); assertEquals(brokers.size(), totalBrokers); @@ -320,7 +321,8 @@ public class AntiAffinityNamespaceGroupTest { Set<String> brokers = Sets.newHashSet(); Set<String> candidate = Sets.newHashSet(); - ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = + ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build(); brokers.add("broker-0"); brokers.add("broker-1"); brokers.add("broker-2"); @@ -367,9 +369,11 @@ public class AntiAffinityNamespaceGroupTest { private void selectBrokerForNamespace( ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange, String broker, String namespace, String assignedBundleName) { - ConcurrentOpenHashSet<String> bundleSet = new ConcurrentOpenHashSet<>(); + ConcurrentOpenHashSet<String> bundleSet = + ConcurrentOpenHashSet.<String>newBuilder().build(); bundleSet.add(assignedBundleName); - ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap = + ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<String>>newBuilder().build(); nsToBundleMap.put(namespace, bundleSet); brokerToNamespaceToBundleRange.put(broker, nsToBundleMap); } @@ -469,7 +473,8 @@ public class AntiAffinityNamespaceGroupTest { Set<String> brokers = Sets.newHashSet(); Set<String> candidate = Sets.newHashSet(); - ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = + ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build(); brokers.add("broker-0"); brokers.add("broker-1"); brokers.add("broker-2"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java index 716b9716425..d23772185f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java @@ -36,7 +36,10 @@ public class LoadManagerSharedTest { String assignedBundle = namespace + "/0x00000000_0x40000000"; Set<String> candidates = Sets.newHashSet(); - ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map = + ConcurrentOpenHashMap.<String, + ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder() + .build(); LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map); Assert.assertEquals(candidates.size(), 0); @@ -80,8 +83,12 @@ public class LoadManagerSharedTest { private static void fillBrokerToNamespaceToBundleMap( ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map, String broker, String namespace, String bundle) { - map.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>()) - .computeIfAbsent(namespace, k -> new ConcurrentOpenHashSet<>()).add(bundle); + map.computeIfAbsent(broker, + k -> ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<String>>newBuilder().build()) + .computeIfAbsent(namespace, + k -> ConcurrentOpenHashSet.<String>newBuilder().build()) + .add(bundle); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 4e53819e44c..6c8b4d5f334 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -850,7 +850,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1); + ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = + ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); subscriptions.put("sub-1", sub); subscriptions.put("sub-2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -954,7 +958,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1); + ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = + ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); subscriptions.put("sub-1", sub); subscriptions.put("sub-2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -1081,7 +1089,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { addConsumerToSubscription.setAccessible(true); // for count consumers on topic - ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1); + ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = + ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); subscriptions.put("sub1", sub1); subscriptions.put("sub2", sub2); Field field = topic.getClass().getDeclaredField("subscriptions"); @@ -2071,7 +2083,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { public void testCheckInactiveSubscriptions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1); + ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = + ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); // This subscription is connected by consumer. PersistentSubscription nonDeletableSubscription1 = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "nonDeletableSubscription1", cursorMock, false); subscriptions.put(nonDeletableSubscription1.getName(), nonDeletableSubscription1); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 5b14a841b8c..689c4eb7405 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -110,7 +110,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T this.consumerEventListener = conf.getConsumerEventListener(); // Always use growable queue since items can exceed the advertised size this.incomingMessages = new GrowableArrayBlockingQueue<>(); - this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>(); + this.unAckedChunkedMessageIdSequenceMap = + ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build(); this.executorProvider = executorProvider; this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor(); this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 0304a608f05..b4ee5a2e784 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -183,7 +183,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle protected volatile boolean paused; - protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>(); + protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = + ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build(); private int pendingChunkedMessageCount = 0; protected long expireTimeOfIncompleteChunkedMessageMillis = 0; private boolean expireChunkMessageTaskScheduled = false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index e61e7c82166..4a84ba03ebe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -76,7 +76,8 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) { super(client, topic, conf, producerCreatedFuture, schema, interceptors); - this.producers = new ConcurrentOpenHashMap<>(); + this.producers = + ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build(); this.topicMetadata = new TopicMetadataImpl(numPartitions); this.routerPolicy = getMessageRouter(); stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index 0117e651e6f..c7b9d24151f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -50,7 +50,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T this.conf = conf; this.schema = schema; this.interceptors = interceptors; - this.schemaCache = new ConcurrentOpenHashMap<>(); + this.schemaCache = + ConcurrentOpenHashMap.<SchemaHash, byte[]>newBuilder().build(); if (!conf.isMultiSchema()) { multiSchemaMode = MultiSchemaMode.Disabled; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index c0b952a281a..d577f48357c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -60,7 +60,8 @@ public class AcknowledgementsGroupingTrackerTest { public void setup() throws NoSuchFieldException, IllegalAccessException { eventLoopGroup = new NioEventLoopGroup(1); consumer = mock(ConsumerImpl.class); - consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>(); + consumer.unAckedChunkedMessageIdSequenceMap = + ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build(); cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new NioEventLoopGroup())); PulsarClientImpl client = mock(PulsarClientImpl.class); doReturn(client).when(consumer).getClient(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java index f1806c511e2..abbe11576a9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java @@ -45,8 +45,74 @@ public class ConcurrentLongPairSet implements LongPairSet { private static final int DefaultExpectedItems = 256; private static final int DefaultConcurrencyLevel = 16; + private static final float DefaultMapFillFactor = 0.66f; + private static final float DefaultMapIdleFactor = 0.15f; + + private static final float DefaultExpandFactor = 2; + private static final float DefaultShrinkFactor = 2; + + private static final boolean DefaultAutoShrink = false; + private final Section[] sections; + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder of ConcurrentLongPairSet. + */ + public static class Builder { + int expectedItems = DefaultExpectedItems; + int concurrencyLevel = DefaultConcurrencyLevel; + float mapFillFactor = DefaultMapFillFactor; + float mapIdleFactor = DefaultMapIdleFactor; + float expandFactor = DefaultExpandFactor; + float shrinkFactor = DefaultShrinkFactor; + boolean autoShrink = DefaultAutoShrink; + + public Builder expectedItems(int expectedItems) { + this.expectedItems = expectedItems; + return this; + } + + public Builder concurrencyLevel(int concurrencyLevel) { + this.concurrencyLevel = concurrencyLevel; + return this; + } + + public Builder mapFillFactor(float mapFillFactor) { + this.mapFillFactor = mapFillFactor; + return this; + } + + public Builder mapIdleFactor(float mapIdleFactor) { + this.mapIdleFactor = mapIdleFactor; + return this; + } + + public Builder expandFactor(float expandFactor) { + this.expandFactor = expandFactor; + return this; + } + + public Builder shrinkFactor(float shrinkFactor) { + this.shrinkFactor = shrinkFactor; + return this; + } + + public Builder autoShrink(boolean autoShrink) { + this.autoShrink = autoShrink; + return this; + } + + public ConcurrentLongPairSet build() { + return new ConcurrentLongPairSet(expectedItems, concurrencyLevel, + mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor); + } + } + + /** * Represents a function that accepts an object of the {@code LongPair} type. */ @@ -61,18 +127,33 @@ public class ConcurrentLongPairSet implements LongPairSet { void accept(long v1, long v2); } + @Deprecated public ConcurrentLongPairSet() { this(DefaultExpectedItems); } + @Deprecated public ConcurrentLongPairSet(int expectedItems) { this(expectedItems, DefaultConcurrencyLevel); } + @Deprecated public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel) { + this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor, + DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor); + } + + public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel, + float mapFillFactor, float mapIdleFactor, + boolean autoShrink, float expandFactor, float shrinkFactor) { checkArgument(expectedItems > 0); checkArgument(concurrencyLevel > 0); checkArgument(expectedItems >= concurrencyLevel); + checkArgument(mapFillFactor > 0 && mapFillFactor < 1); + checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1); + checkArgument(mapFillFactor > mapIdleFactor); + checkArgument(expandFactor > 1); + checkArgument(shrinkFactor > 1); int numSections = concurrencyLevel; int perSectionExpectedItems = expectedItems / numSections; @@ -80,10 +161,12 @@ public class ConcurrentLongPairSet implements LongPairSet { this.sections = new Section[numSections]; for (int i = 0; i < numSections; i++) { - sections[i] = new Section(perSectionCapacity); + sections[i] = new Section(perSectionCapacity, mapFillFactor, mapIdleFactor, + autoShrink, expandFactor, shrinkFactor); } } + @Override public long size() { long size = 0; for (int i = 0; i < sections.length; i++) { @@ -214,18 +297,33 @@ public class ConcurrentLongPairSet implements LongPairSet { private volatile long[] table; private volatile int capacity; + private final int initCapacity; private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater .newUpdater(Section.class, "size"); private volatile int size; private int usedBuckets; - private int resizeThreshold; - - Section(int capacity) { + private int resizeThresholdUp; + private int resizeThresholdBelow; + private final float mapFillFactor; + private final float mapIdleFactor; + private final float expandFactor; + private final float shrinkFactor; + private final boolean autoShrink; + + Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink, + float expandFactor, float shrinkFactor) { this.capacity = alignToPowerOfTwo(capacity); + this.initCapacity = this.capacity; this.table = new long[2 * this.capacity]; this.size = 0; this.usedBuckets = 0; - this.resizeThreshold = (int) (this.capacity * SetFillFactor); + this.autoShrink = autoShrink; + this.mapFillFactor = mapFillFactor; + this.mapIdleFactor = mapIdleFactor; + this.expandFactor = expandFactor; + this.shrinkFactor = shrinkFactor; + this.resizeThresholdUp = (int) (this.capacity * mapFillFactor); + this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor); Arrays.fill(table, EmptyItem); } @@ -314,9 +412,11 @@ public class ConcurrentLongPairSet implements LongPairSet { bucket = (bucket + 2) & (table.length - 1); } } finally { - if (usedBuckets > resizeThreshold) { + if (usedBuckets > resizeThresholdUp) { try { - rehash(); + // Expand the hashmap + int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor)); + rehash(newCapacity); } finally { unlockWrite(stamp); } @@ -347,7 +447,20 @@ public class ConcurrentLongPairSet implements LongPairSet { bucket = (bucket + 2) & (table.length - 1); } } finally { - unlockWrite(stamp); + if (autoShrink && size < resizeThresholdBelow) { + try { + int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor)); + int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); + if (newCapacity < capacity && newResizeThresholdUp > size) { + // shrink the hashmap + rehash(newCapacity); + } + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } } } @@ -379,6 +492,16 @@ public class ConcurrentLongPairSet implements LongPairSet { table[bucket] = EmptyItem; table[bucket + 1] = EmptyItem; --usedBuckets; + + // Cleanup all the buckets that were in `DeletedKey` state, + // so that we can reduce unnecessary expansions + bucket = (bucket - 1) & (table.length - 1); + while (table[bucket] == DeletedItem) { + table[bucket] = EmptyItem; + --usedBuckets; + + bucket = (bucket - 1) & (table.length - 1); + } } else { table[bucket] = DeletedItem; table[bucket + 1] = DeletedItem; @@ -392,6 +515,9 @@ public class ConcurrentLongPairSet implements LongPairSet { Arrays.fill(table, EmptyItem); this.size = 0; this.usedBuckets = 0; + if (autoShrink) { + rehash(initCapacity); + } } finally { unlockWrite(stamp); } @@ -431,9 +557,8 @@ public class ConcurrentLongPairSet implements LongPairSet { } } - private void rehash() { + private void rehash(int newCapacity) { // Expand the hashmap - int newCapacity = capacity * 2; long[] newTable = new long[2 * newCapacity]; Arrays.fill(newTable, EmptyItem); @@ -451,7 +576,8 @@ public class ConcurrentLongPairSet implements LongPairSet { // Capacity needs to be updated after the values, so that we won't see // a capacity value bigger than the actual array size capacity = newCapacity; - resizeThreshold = (int) (capacity * SetFillFactor); + resizeThresholdUp = (int) (capacity * mapFillFactor); + resizeThresholdBelow = (int) (capacity * mapIdleFactor); } private static void insertKeyValueNoLock(long[] table, int capacity, long item1, long item2) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java index 5966a95e5f3..1ccbeb3b6b5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java @@ -64,33 +64,112 @@ public class ConcurrentOpenHashMap<K, V> { } }; - private static final float MapFillFactor = 0.66f; - private static final int DefaultExpectedItems = 256; private static final int DefaultConcurrencyLevel = 16; + private static final float DefaultMapFillFactor = 0.66f; + private static final float DefaultMapIdleFactor = 0.15f; + + private static final float DefaultExpandFactor = 2; + private static final float DefaultShrinkFactor = 2; + + private static final boolean DefaultAutoShrink = false; + private final Section<K, V>[] sections; + public static <K, V> Builder<K, V> newBuilder() { + return new Builder<>(); + } + + /** + * Builder of ConcurrentOpenHashMap. + */ + public static class Builder<K, V> { + int expectedItems = DefaultExpectedItems; + int concurrencyLevel = DefaultConcurrencyLevel; + float mapFillFactor = DefaultMapFillFactor; + float mapIdleFactor = DefaultMapIdleFactor; + float expandFactor = DefaultExpandFactor; + float shrinkFactor = DefaultShrinkFactor; + boolean autoShrink = DefaultAutoShrink; + + public Builder<K, V> expectedItems(int expectedItems) { + this.expectedItems = expectedItems; + return this; + } + + public Builder<K, V> concurrencyLevel(int concurrencyLevel) { + this.concurrencyLevel = concurrencyLevel; + return this; + } + + public Builder<K, V> mapFillFactor(float mapFillFactor) { + this.mapFillFactor = mapFillFactor; + return this; + } + + public Builder<K, V> mapIdleFactor(float mapIdleFactor) { + this.mapIdleFactor = mapIdleFactor; + return this; + } + + public Builder<K, V> expandFactor(float expandFactor) { + this.expandFactor = expandFactor; + return this; + } + + public Builder<K, V> shrinkFactor(float shrinkFactor) { + this.shrinkFactor = shrinkFactor; + return this; + } + + public Builder<K, V> autoShrink(boolean autoShrink) { + this.autoShrink = autoShrink; + return this; + } + + public ConcurrentOpenHashMap<K, V> build() { + return new ConcurrentOpenHashMap<>(expectedItems, concurrencyLevel, + mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor); + } + } + + @Deprecated public ConcurrentOpenHashMap() { this(DefaultExpectedItems); } + @Deprecated public ConcurrentOpenHashMap(int expectedItems) { this(expectedItems, DefaultConcurrencyLevel); } + @Deprecated public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) { + this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor, + DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor); + } + + public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel, + float mapFillFactor, float mapIdleFactor, + boolean autoShrink, float expandFactor, float shrinkFactor) { checkArgument(expectedItems > 0); checkArgument(concurrencyLevel > 0); checkArgument(expectedItems >= concurrencyLevel); + checkArgument(mapFillFactor > 0 && mapFillFactor < 1); + checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1); + checkArgument(mapFillFactor > mapIdleFactor); + checkArgument(expandFactor > 1); + checkArgument(shrinkFactor > 1); int numSections = concurrencyLevel; int perSectionExpectedItems = expectedItems / numSections; - int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor); + int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor); this.sections = (Section<K, V>[]) new Section[numSections]; for (int i = 0; i < numSections; i++) { - sections[i] = new Section<>(perSectionCapacity); + sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor, + autoShrink, expandFactor, shrinkFactor); } } @@ -208,18 +287,33 @@ public class ConcurrentOpenHashMap<K, V> { private volatile Object[] table; private volatile int capacity; + private final int initCapacity; private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Section.class, "size"); private volatile int size; private int usedBuckets; - private int resizeThreshold; - - Section(int capacity) { + private int resizeThresholdUp; + private int resizeThresholdBelow; + private final float mapFillFactor; + private final float mapIdleFactor; + private final float expandFactor; + private final float shrinkFactor; + private final boolean autoShrink; + + Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink, + float expandFactor, float shrinkFactor) { this.capacity = alignToPowerOfTwo(capacity); + this.initCapacity = this.capacity; this.table = new Object[2 * this.capacity]; this.size = 0; this.usedBuckets = 0; - this.resizeThreshold = (int) (this.capacity * MapFillFactor); + this.autoShrink = autoShrink; + this.mapFillFactor = mapFillFactor; + this.mapIdleFactor = mapIdleFactor; + this.expandFactor = expandFactor; + this.shrinkFactor = shrinkFactor; + this.resizeThresholdUp = (int) (this.capacity * mapFillFactor); + this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor); } V get(K key, int keyHash) { @@ -316,9 +410,11 @@ public class ConcurrentOpenHashMap<K, V> { bucket = (bucket + 2) & (table.length - 1); } } finally { - if (usedBuckets > resizeThreshold) { + if (usedBuckets > resizeThresholdUp) { try { - rehash(); + // Expand the hashmap + int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor)); + rehash(newCapacity); } finally { unlockWrite(stamp); } @@ -363,7 +459,20 @@ public class ConcurrentOpenHashMap<K, V> { } } finally { - unlockWrite(stamp); + if (autoShrink && size < resizeThresholdBelow) { + try { + int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor)); + int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); + if (newCapacity < capacity && newResizeThresholdUp > size) { + // shrink the hashmap + rehash(newCapacity); + } + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } } } @@ -374,6 +483,9 @@ public class ConcurrentOpenHashMap<K, V> { Arrays.fill(table, EmptyKey); this.size = 0; this.usedBuckets = 0; + if (autoShrink) { + rehash(initCapacity); + } } finally { unlockWrite(stamp); } @@ -415,9 +527,8 @@ public class ConcurrentOpenHashMap<K, V> { } } - private void rehash() { + private void rehash(int newCapacity) { // Expand the hashmap - int newCapacity = capacity * 2; Object[] newTable = new Object[2 * newCapacity]; // Re-hash table @@ -432,7 +543,8 @@ public class ConcurrentOpenHashMap<K, V> { table = newTable; capacity = newCapacity; usedBuckets = size; - resizeThreshold = (int) (capacity * MapFillFactor); + resizeThresholdUp = (int) (capacity * mapFillFactor); + resizeThresholdBelow = (int) (capacity * mapIdleFactor); } private static <K, V> void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java index 8e0e69d32df..28f0df0ff20 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java @@ -43,33 +43,112 @@ public class ConcurrentOpenHashSet<V> { private static final Object EmptyValue = null; private static final Object DeletedValue = new Object(); - private static final float MapFillFactor = 0.66f; - private static final int DefaultExpectedItems = 256; private static final int DefaultConcurrencyLevel = 16; + private static final float DefaultMapFillFactor = 0.66f; + private static final float DefaultMapIdleFactor = 0.15f; + + private static final float DefaultExpandFactor = 2; + private static final float DefaultShrinkFactor = 2; + + private static final boolean DefaultAutoShrink = false; + private final Section<V>[] sections; + public static <V> Builder<V> newBuilder() { + return new Builder<>(); + } + + /** + * Builder of ConcurrentOpenHashSet. + */ + public static class Builder<V> { + int expectedItems = DefaultExpectedItems; + int concurrencyLevel = DefaultConcurrencyLevel; + float mapFillFactor = DefaultMapFillFactor; + float mapIdleFactor = DefaultMapIdleFactor; + float expandFactor = DefaultExpandFactor; + float shrinkFactor = DefaultShrinkFactor; + boolean autoShrink = DefaultAutoShrink; + + public Builder<V> expectedItems(int expectedItems) { + this.expectedItems = expectedItems; + return this; + } + + public Builder<V> concurrencyLevel(int concurrencyLevel) { + this.concurrencyLevel = concurrencyLevel; + return this; + } + + public Builder<V> mapFillFactor(float mapFillFactor) { + this.mapFillFactor = mapFillFactor; + return this; + } + + public Builder<V> mapIdleFactor(float mapIdleFactor) { + this.mapIdleFactor = mapIdleFactor; + return this; + } + + public Builder<V> expandFactor(float expandFactor) { + this.expandFactor = expandFactor; + return this; + } + + public Builder<V> shrinkFactor(float shrinkFactor) { + this.shrinkFactor = shrinkFactor; + return this; + } + + public Builder<V> autoShrink(boolean autoShrink) { + this.autoShrink = autoShrink; + return this; + } + + public ConcurrentOpenHashSet<V> build() { + return new ConcurrentOpenHashSet<>(expectedItems, concurrencyLevel, + mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor); + } + } + + @Deprecated public ConcurrentOpenHashSet() { this(DefaultExpectedItems); } + @Deprecated public ConcurrentOpenHashSet(int expectedItems) { this(expectedItems, DefaultConcurrencyLevel); } + @Deprecated public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) { + this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor, + DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor); + } + + public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel, + float mapFillFactor, float mapIdleFactor, + boolean autoShrink, float expandFactor, float shrinkFactor) { checkArgument(expectedItems > 0); checkArgument(concurrencyLevel > 0); checkArgument(expectedItems >= concurrencyLevel); + checkArgument(mapFillFactor > 0 && mapFillFactor < 1); + checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1); + checkArgument(mapFillFactor > mapIdleFactor); + checkArgument(expandFactor > 1); + checkArgument(shrinkFactor > 1); int numSections = concurrencyLevel; int perSectionExpectedItems = expectedItems / numSections; - int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor); + int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor); this.sections = (Section<V>[]) new Section[numSections]; for (int i = 0; i < numSections; i++) { - sections[i] = new Section<>(perSectionCapacity); + sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor, + autoShrink, expandFactor, shrinkFactor); } } @@ -177,18 +256,33 @@ public class ConcurrentOpenHashSet<V> { private volatile V[] values; private volatile int capacity; + private final int initCapacity; private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Section.class, "size"); private volatile int size; private int usedBuckets; - private int resizeThreshold; - - Section(int capacity) { + private int resizeThresholdUp; + private int resizeThresholdBelow; + private final float mapFillFactor; + private final float mapIdleFactor; + private final float expandFactor; + private final float shrinkFactor; + private final boolean autoShrink; + + Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink, + float expandFactor, float shrinkFactor) { this.capacity = alignToPowerOfTwo(capacity); + this.initCapacity = this.capacity; this.values = (V[]) new Object[this.capacity]; this.size = 0; this.usedBuckets = 0; - this.resizeThreshold = (int) (this.capacity * MapFillFactor); + this.autoShrink = autoShrink; + this.mapFillFactor = mapFillFactor; + this.mapIdleFactor = mapIdleFactor; + this.expandFactor = expandFactor; + this.shrinkFactor = shrinkFactor; + this.resizeThresholdUp = (int) (this.capacity * mapFillFactor); + this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor); } boolean contains(V value, int keyHash) { @@ -284,9 +378,11 @@ public class ConcurrentOpenHashSet<V> { ++bucket; } } finally { - if (usedBuckets > resizeThreshold) { + if (usedBuckets > resizeThresholdUp) { try { - rehash(); + // Expand the hashmap + int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor)); + rehash(newCapacity); } finally { unlockWrite(stamp); } @@ -319,7 +415,20 @@ public class ConcurrentOpenHashSet<V> { } } finally { - unlockWrite(stamp); + if (autoShrink && size < resizeThresholdBelow) { + try { + int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor)); + int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); + if (newCapacity < capacity && newResizeThresholdUp > size) { + // shrink the hashmap + rehash(newCapacity); + } + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } } } @@ -330,6 +439,9 @@ public class ConcurrentOpenHashSet<V> { Arrays.fill(values, EmptyValue); this.size = 0; this.usedBuckets = 0; + if (autoShrink) { + rehash(initCapacity); + } } finally { unlockWrite(stamp); } @@ -402,9 +514,8 @@ public class ConcurrentOpenHashSet<V> { } } - private void rehash() { + private void rehash(int newCapacity) { // Expand the hashmap - int newCapacity = capacity * 2; V[] newValues = (V[]) new Object[newCapacity]; // Re-hash table @@ -418,7 +529,8 @@ public class ConcurrentOpenHashSet<V> { values = newValues; capacity = newCapacity; usedBuckets = size; - resizeThreshold = (int) (capacity * MapFillFactor); + resizeThresholdUp = (int) (capacity * mapFillFactor); + resizeThresholdBelow = (int) (capacity * mapIdleFactor); } private static <V> void insertValueNoLock(V[] values, V value) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java index 95e2302dcb7..e4cb668fc92 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java @@ -79,7 +79,10 @@ public class ConcurrentSortedLongPairSet implements LongPairSet { @Override public boolean add(long item1, long item2) { ConcurrentLongPairSet messagesToReplay = longPairSets.computeIfAbsent(item1, - (key) -> new ConcurrentLongPairSet(expectedItems, concurrencyLevel)); + (key) -> ConcurrentLongPairSet.newBuilder() + .expectedItems(expectedItems) + .concurrencyLevel(concurrencyLevel) + .build()); return messagesToReplay.add(item1, item2); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java index 82cac712975..a8d3e1d0603 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java @@ -45,21 +45,29 @@ public class ConcurrentLongPairSetTest { @Test public void testConstructor() { try { - new ConcurrentLongPairSet(0); + ConcurrentLongPairSet.newBuilder() + .expectedItems(0) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok } try { - new ConcurrentLongPairSet(16, 0); + ConcurrentLongPairSet.newBuilder() + .expectedItems(16) + .concurrencyLevel(0) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok } try { - new ConcurrentLongPairSet(4, 8); + ConcurrentLongPairSet.newBuilder() + .expectedItems(4) + .concurrencyLevel(8) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok @@ -68,7 +76,9 @@ public class ConcurrentLongPairSetTest { @Test public void simpleInsertions() { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(16); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder() + .expectedItems(16) + .build(); assertTrue(set.isEmpty()); assertTrue(set.add(1, 1)); @@ -94,9 +104,64 @@ public class ConcurrentLongPairSetTest { assertEquals(set.size(), 3); } + @Test + public void testClear() { + ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertTrue(map.add(1, 1)); + assertTrue(map.add(2, 2)); + assertTrue(map.add(3, 3)); + + assertTrue(map.capacity() == 8); + map.clear(); + assertTrue(map.capacity() == 4); + } + + @Test + public void testExpandAndShrink() { + ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertTrue(map.add(1, 1)); + assertTrue(map.add(2, 2)); + assertTrue(map.add(3, 3)); + + // expand hashmap + assertTrue(map.capacity() == 8); + + assertTrue(map.remove(1, 1)); + // not shrink + assertTrue(map.capacity() == 8); + assertTrue(map.remove(2, 2)); + // shrink hashmap + assertTrue(map.capacity() == 4); + + // expand hashmap + assertTrue(map.add(4, 4)); + assertTrue(map.add(5, 5)); + assertTrue(map.capacity() == 8); + + //verify that the map does not keep shrinking at every remove() operation + assertTrue(map.add(6, 6)); + assertTrue(map.remove(6, 6)); + assertTrue(map.capacity() == 8); + } + + @Test public void testRemove() { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build(); assertTrue(set.isEmpty()); assertTrue(set.add(1, 1)); @@ -111,7 +176,10 @@ public class ConcurrentLongPairSetTest { @Test public void testRehashing() { int n = 16; - ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(set.capacity(), n); assertEquals(set.size(), 0); @@ -126,7 +194,10 @@ public class ConcurrentLongPairSetTest { @Test public void testRehashingRemoval() { int n = 16; - ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(set.capacity(), n); assertEquals(set.size(), 0); @@ -152,7 +223,10 @@ public class ConcurrentLongPairSetTest { @Test public void testRehashingWithDeletes() { int n = 16; - ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(set.capacity(), n); assertEquals(set.size(), 0); @@ -177,7 +251,7 @@ public class ConcurrentLongPairSetTest { @Test public void concurrentInsertions() throws Throwable { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -210,7 +284,7 @@ public class ConcurrentLongPairSetTest { @Test public void concurrentInsertionsAndReads() throws Throwable { - ConcurrentLongPairSet map = new ConcurrentLongPairSet(); + ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder().build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -243,7 +317,7 @@ public class ConcurrentLongPairSetTest { @Test public void testIteration() { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build(); assertEquals(set.items(), Collections.emptyList()); @@ -269,7 +343,7 @@ public class ConcurrentLongPairSetTest { @Test public void testRemoval() { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build(); set.add(0, 0); set.add(1, 1); @@ -295,7 +369,7 @@ public class ConcurrentLongPairSetTest { @Test public void testIfRemoval() { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build(); set.add(0, 0); set.add(1, 1); @@ -319,7 +393,7 @@ public class ConcurrentLongPairSetTest { @Test public void testItems() { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build(); int n = 100; int limit = 10; @@ -340,7 +414,10 @@ public class ConcurrentLongPairSetTest { @Test public void testHashConflictWithDeletion() { final int Buckets = 16; - ConcurrentLongPairSet set = new ConcurrentLongPairSet(Buckets, 1); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder() + .expectedItems(Buckets) + .concurrencyLevel(1) + .build(); // Pick 2 keys that fall into the same bucket long key1 = 1; @@ -375,7 +452,7 @@ public class ConcurrentLongPairSetTest { @Test public void testEqualsObjects() { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build(); long t1 = 1; long t2 = 2; @@ -397,7 +474,7 @@ public class ConcurrentLongPairSetTest { @Test public void testToString() { - ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build(); set.add(0, 0); set.add(1, 1); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java index 254be51f292..7919485d9b6 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java @@ -49,21 +49,29 @@ public class ConcurrentOpenHashMapTest { @Test public void testConstructor() { try { - new ConcurrentOpenHashMap<String, String>(0); + ConcurrentOpenHashMap.<String, String>newBuilder() + .expectedItems(0) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok } try { - new ConcurrentOpenHashMap<String, String>(16, 0); + ConcurrentOpenHashMap.<String, String>newBuilder() + .expectedItems(16) + .concurrencyLevel(0) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok } try { - new ConcurrentOpenHashMap<String, String>(4, 8); + ConcurrentOpenHashMap.<String, String>newBuilder() + .expectedItems(4) + .concurrencyLevel(8) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok @@ -72,7 +80,10 @@ public class ConcurrentOpenHashMapTest { @Test public void simpleInsertions() { - ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16); + ConcurrentOpenHashMap<String, String> map = + ConcurrentOpenHashMap.<String, String>newBuilder() + .expectedItems(16) + .build(); assertTrue(map.isEmpty()); assertNull(map.put("1", "one")); @@ -98,9 +109,64 @@ public class ConcurrentOpenHashMapTest { assertEquals(map.size(), 3); } + @Test + public void testClear() { + ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertNull(map.put("k1", "v1")); + assertNull(map.put("k2", "v2")); + assertNull(map.put("k3", "v3")); + + assertTrue(map.capacity() == 8); + map.clear(); + assertTrue(map.capacity() == 4); + } + + @Test + public void testExpandAndShrink() { + ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertNull(map.put("k1", "v1")); + assertNull(map.put("k2", "v2")); + assertNull(map.put("k3", "v3")); + + // expand hashmap + assertTrue(map.capacity() == 8); + + assertTrue(map.remove("k1", "v1")); + // not shrink + assertTrue(map.capacity() == 8); + assertTrue(map.remove("k2", "v2")); + // shrink hashmap + assertTrue(map.capacity() == 4); + + // expand hashmap + assertNull(map.put("k4", "v4")); + assertNull(map.put("k5", "v5")); + assertTrue(map.capacity() == 8); + + //verify that the map does not keep shrinking at every remove() operation + assertNull(map.put("k6", "v6")); + assertTrue(map.remove("k6", "v6")); + assertTrue(map.capacity() == 8); + } + @Test public void testRemove() { - ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<String, String> map = + ConcurrentOpenHashMap.<String, String>newBuilder().build(); assertTrue(map.isEmpty()); assertNull(map.put("1", "one")); @@ -117,7 +183,10 @@ public class ConcurrentOpenHashMapTest { @Test public void testRehashing() { int n = 16; - ConcurrentOpenHashMap<String, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1); + ConcurrentOpenHashMap<String, Integer> map = ConcurrentOpenHashMap.<String, Integer>newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(map.capacity(), n); assertEquals(map.size(), 0); @@ -132,7 +201,11 @@ public class ConcurrentOpenHashMapTest { @Test public void testRehashingWithDeletes() { int n = 16; - ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1); + ConcurrentOpenHashMap<Integer, Integer> map = + ConcurrentOpenHashMap.<Integer, Integer>newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(map.capacity(), n); assertEquals(map.size(), 0); @@ -154,7 +227,10 @@ public class ConcurrentOpenHashMapTest { @Test public void concurrentInsertions() throws Throwable { - ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(16, 1); + ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -188,7 +264,8 @@ public class ConcurrentOpenHashMapTest { @Test public void concurrentInsertionsAndReads() throws Throwable { - ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<Long, String> map = + ConcurrentOpenHashMap.<Long, String>newBuilder().build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -222,7 +299,8 @@ public class ConcurrentOpenHashMapTest { @Test public void testIteration() { - ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<Long, String> map = + ConcurrentOpenHashMap.<Long, String>newBuilder().build(); assertEquals(map.keys(), Collections.emptyList()); assertEquals(map.values(), Collections.emptyList()); @@ -266,7 +344,10 @@ public class ConcurrentOpenHashMapTest { @Test public void testHashConflictWithDeletion() { final int Buckets = 16; - ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(Buckets, 1); + ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder() + .expectedItems(Buckets) + .concurrencyLevel(1) + .build(); // Pick 2 keys that fall into the same bucket long key1 = 1; @@ -299,7 +380,8 @@ public class ConcurrentOpenHashMapTest { @Test public void testPutIfAbsent() { - ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<Long, String> map = + ConcurrentOpenHashMap.<Long, String>newBuilder().build(); assertNull(map.putIfAbsent(1l, "one")); assertEquals(map.get(1l), "one"); @@ -309,7 +391,10 @@ public class ConcurrentOpenHashMapTest { @Test public void testComputeIfAbsent() { - ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(16, 1); + ConcurrentOpenHashMap<Integer, Integer> map = ConcurrentOpenHashMap.<Integer, Integer>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); AtomicInteger counter = new AtomicInteger(); Function<Integer, Integer> provider = key -> counter.getAndIncrement(); @@ -350,7 +435,8 @@ public class ConcurrentOpenHashMapTest { } } - ConcurrentOpenHashMap<T, String> map = new ConcurrentOpenHashMap<>(); + ConcurrentOpenHashMap<T, String> map = + ConcurrentOpenHashMap.<T, String>newBuilder().build(); T t1 = new T(1); T t1_b = new T(1); @@ -372,7 +458,11 @@ public class ConcurrentOpenHashMapTest { @Test public void testNullValue() { - ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1); + ConcurrentOpenHashMap<String, String> map = + ConcurrentOpenHashMap.<String, String>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); String key = "a"; assertThrows(NullPointerException.class, () -> map.put(key, null)); @@ -406,7 +496,10 @@ public class ConcurrentOpenHashMapTest { static final int N = 1_000_000; public void benchConcurrentOpenHashMap() throws Exception { - ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(N, 1); + ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder() + .expectedItems(N) + .concurrencyLevel(1) + .build(); for (long i = 0; i < Iterations; i++) { for (int j = 0; j < N; j++) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java index 3c1d99668d7..af62948b64a 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java @@ -91,9 +91,66 @@ public class ConcurrentOpenHashSetTest { assertEquals(set.size(), 3); } + @Test + public void testClear() { + ConcurrentOpenHashSet<String> map = + ConcurrentOpenHashSet.<String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertTrue(map.add("k1")); + assertTrue(map.add("k2")); + assertTrue(map.add("k3")); + + assertTrue(map.capacity() == 8); + map.clear(); + assertTrue(map.capacity() == 4); + } + + @Test + public void testExpandAndShrink() { + ConcurrentOpenHashSet<String> map = + ConcurrentOpenHashSet.<String>newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertTrue(map.add("k1")); + assertTrue(map.add("k2")); + assertTrue(map.add("k3")); + + // expand hashmap + assertTrue(map.capacity() == 8); + + assertTrue(map.remove("k1")); + // not shrink + assertTrue(map.capacity() == 8); + assertTrue(map.remove("k2")); + // shrink hashmap + assertTrue(map.capacity() == 4); + + // expand hashmap + assertTrue(map.add("k4")); + assertTrue(map.add("k5")); + assertTrue(map.capacity() == 8); + + //verify that the map does not keep shrinking at every remove() operation + assertTrue(map.add("k6")); + assertTrue(map.remove("k6")); + assertTrue(map.capacity() == 8); + } + @Test public void testRemove() { - ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>(); + ConcurrentOpenHashSet<String> set = + ConcurrentOpenHashSet.<String>newBuilder().build(); assertTrue(set.isEmpty()); assertTrue(set.add("1")); @@ -145,7 +202,8 @@ public class ConcurrentOpenHashSetTest { @Test public void concurrentInsertions() throws Throwable { - ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>(); + ConcurrentOpenHashSet<Long> set = + ConcurrentOpenHashSet.<Long>newBuilder().build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -178,7 +236,8 @@ public class ConcurrentOpenHashSetTest { @Test public void concurrentInsertionsAndReads() throws Throwable { - ConcurrentOpenHashSet<Long> map = new ConcurrentOpenHashSet<>(); + ConcurrentOpenHashSet<Long> map = + ConcurrentOpenHashSet.<Long>newBuilder().build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -211,7 +270,7 @@ public class ConcurrentOpenHashSetTest { @Test public void testIteration() { - ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>(); + ConcurrentOpenHashSet<Long> set = ConcurrentOpenHashSet.<Long>newBuilder().build(); assertEquals(set.values(), Collections.emptyList()); @@ -237,7 +296,8 @@ public class ConcurrentOpenHashSetTest { @Test public void testRemoval() { - ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>(); + ConcurrentOpenHashSet<Integer> set = + ConcurrentOpenHashSet.<Integer>newBuilder().build(); set.add(0); set.add(1); @@ -315,7 +375,8 @@ public class ConcurrentOpenHashSetTest { } } - ConcurrentOpenHashSet<T> set = new ConcurrentOpenHashSet<>(); + ConcurrentOpenHashSet<T> set = + ConcurrentOpenHashSet.<T>newBuilder().build(); T t1 = new T(1); T t1_b = new T(1); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index 8e85618f3cc..1ea232203d3 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -120,7 +120,8 @@ public class PulsarRecordCursor implements RecordCursor { PulsarDispatchingRowDecoderFactory decoderFactory; - protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>(); + protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = + ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build(); private static final Logger log = Logger.get(PulsarRecordCursor.class); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index ee607687b15..5a81d9f21a2 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -81,9 +81,17 @@ public class WebSocketService implements Closeable { public WebSocketService(ClusterData localCluster, ServiceConfiguration config) { this.config = config; this.localCluster = localCluster; - this.topicProducerMap = new ConcurrentOpenHashMap<>(); - this.topicConsumerMap = new ConcurrentOpenHashMap<>(); - this.topicReaderMap = new ConcurrentOpenHashMap<>(); + this.topicProducerMap = + ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<ProducerHandler>>newBuilder() + .build(); + this.topicConsumerMap = + ConcurrentOpenHashMap.<String, + ConcurrentOpenHashSet<ConsumerHandler>>newBuilder() + .build(); + this.topicReaderMap = + ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<ReaderHandler>>newBuilder() + .build(); this.proxyStats = new ProxyStats(this); } @@ -249,7 +257,8 @@ public class WebSocketService implements Closeable { public boolean addProducer(ProducerHandler producer) { return topicProducerMap - .computeIfAbsent(producer.getProducer().getTopic(), topic -> new ConcurrentOpenHashSet<>()) + .computeIfAbsent(producer.getProducer().getTopic(), + topic -> ConcurrentOpenHashSet.<ProducerHandler>newBuilder().build()) .add(producer); } @@ -267,7 +276,8 @@ public class WebSocketService implements Closeable { public boolean addConsumer(ConsumerHandler consumer) { return topicConsumerMap - .computeIfAbsent(consumer.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>()) + .computeIfAbsent(consumer.getConsumer().getTopic(), topic -> + ConcurrentOpenHashSet.<ConsumerHandler>newBuilder().build()) .add(consumer); } @@ -284,7 +294,8 @@ public class WebSocketService implements Closeable { } public boolean addReader(ReaderHandler reader) { - return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>()) + return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> + ConcurrentOpenHashSet.<ReaderHandler>newBuilder().build()) .add(reader); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java index cc327f57191..7fd75c9b3d9 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java @@ -48,7 +48,9 @@ public class ProxyStats { super(); this.service = service; this.jvmMetrics = new JvmMetrics(service); - this.topicStats = new ConcurrentOpenHashMap<>(); + this.topicStats = + ConcurrentOpenHashMap.<String, ProxyNamespaceStats>newBuilder() + .build(); this.metricsCollection = new ArrayList<>(); this.tempMetricsCollection = new ArrayList<>(); // schedule stat generation task every 1 minute
