This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new acc463f4b2f [cleanup][broker] delete ConcurrentSortedLongPairSet.
(#23202)
acc463f4b2f is described below
commit acc463f4b2f0648f5e3c9c146124a9223ec909f7
Author: Wenzhi Feng <[email protected]>
AuthorDate: Tue Aug 20 18:31:35 2024 +0800
[cleanup][broker] delete ConcurrentSortedLongPairSet. (#23202)
---
.../collections/ConcurrentSortedLongPairSet.java | 215 ---------------
.../ConcurrentSortedLongPairSetTest.java | 291 ---------------------
2 files changed, 506 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
deleted file mode 100644
index 0718a4f81a6..00000000000
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util.collections;
-
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.lang.mutable.MutableLong;
-import
org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair;
-import
org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairConsumer;
-
-/**
- * Sorted concurrent {@link LongPairSet} which is not fully accurate in
sorting.
- *
- * {@link ConcurrentSortedLongPairSet} creates separate {@link
ConcurrentLongPairSet} for unique first-key of
- * inserted item. So, it can iterate over all items by sorting on item's first
key. However, item's second key will not
- * be sorted. eg:
- *
- * <pre>
- * insert: (1,2), (1,4), (2,1), (1,5), (2,6)
- * while iterating set will first read all the entries for items whose
first-key=1 and then first-key=2.
- * output: (1,4), (1,5), (1,2), (2,6), (2,1)
- * </pre>
- *
- * <p>This map can be expensive and not recommended if set has to store large
number of unique item.first's key
- * because set has to create that many {@link ConcurrentLongPairSet} objects.
- */
-public class ConcurrentSortedLongPairSet implements LongPairSet {
-
- protected final NavigableMap<Long, ConcurrentLongPairSet> longPairSets =
new ConcurrentSkipListMap<>();
- private final int expectedItems;
- private final int concurrencyLevel;
- /**
- * If {@link #longPairSets} adds and removes the item-set frequently then
it allocates and removes
- * {@link ConcurrentLongPairSet} for the same item multiple times which
can lead to gc-puases. To avoid such
- * situation, avoid removing empty LogPairSet until it reaches max limit.
- */
- private final int maxAllowedSetOnRemove;
- private final boolean autoShrink;
- private static final int DEFAULT_MAX_ALLOWED_SET_ON_REMOVE = 10;
-
- public ConcurrentSortedLongPairSet() {
- this(16, 1, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
- }
-
- public ConcurrentSortedLongPairSet(int expectedItems) {
- this(expectedItems, 1, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
- }
-
- public ConcurrentSortedLongPairSet(int expectedItems, int
concurrencyLevel) {
- this(expectedItems, concurrencyLevel,
DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
- }
-
- public ConcurrentSortedLongPairSet(int expectedItems, int
concurrencyLevel, boolean autoShrink) {
- this(expectedItems, concurrencyLevel,
DEFAULT_MAX_ALLOWED_SET_ON_REMOVE, autoShrink);
- }
-
- public ConcurrentSortedLongPairSet(int expectedItems, int
concurrencyLevel, int maxAllowedSetOnRemove) {
- this(expectedItems, concurrencyLevel, maxAllowedSetOnRemove, false);
- }
-
- public ConcurrentSortedLongPairSet(int expectedItems, int
concurrencyLevel, int maxAllowedSetOnRemove,
- boolean autoShrink) {
- this.expectedItems = expectedItems;
- this.concurrencyLevel = concurrencyLevel;
- this.maxAllowedSetOnRemove = maxAllowedSetOnRemove;
- this.autoShrink = autoShrink;
- }
-
- @Override
- public boolean add(long item1, long item2) {
- ConcurrentLongPairSet messagesToReplay =
longPairSets.computeIfAbsent(item1,
- (key) -> ConcurrentLongPairSet.newBuilder()
- .expectedItems(expectedItems)
- .concurrencyLevel(concurrencyLevel)
- .autoShrink(autoShrink)
- .build());
- return messagesToReplay.add(item1, item2);
- }
-
- @Override
- public boolean remove(long item1, long item2) {
- ConcurrentLongPairSet messagesToReplay = longPairSets.get(item1);
- if (messagesToReplay != null) {
- boolean removed = messagesToReplay.remove(item1, item2);
- if (messagesToReplay.isEmpty() && longPairSets.size() >
maxAllowedSetOnRemove) {
- longPairSets.remove(item1, messagesToReplay);
- }
- return removed;
- }
- return false;
- }
-
- @Override
- public int removeIf(LongPairPredicate filter) {
- MutableInt removedValues = new MutableInt(0);
- longPairSets.forEach((item1, longPairSet) -> {
- removedValues.add(longPairSet.removeIf(filter));
- if (longPairSet.isEmpty() && longPairSets.size() >
maxAllowedSetOnRemove) {
- longPairSets.remove(item1, longPairSet);
- }
- });
- return removedValues.intValue();
- }
-
- @Override
- public Set<LongPair> items() {
- return items((int) this.size());
- }
-
- @Override
- public void forEach(LongPairConsumer processor) {
- longPairSets.forEach((__, longPairSet) ->
longPairSet.forEach(processor));
- }
-
- @Override
- public Set<LongPair> items(int numberOfItems) {
- return items(numberOfItems, (item1, item2) -> new LongPair(item1,
item2));
- }
-
- @Override
- public <T> Set<T> items(int numberOfItems, LongPairFunction<T>
longPairConverter) {
- NavigableSet<T> items = new TreeSet<>();
- forEach((i1, i2) -> {
- items.add(longPairConverter.apply(i1, i2));
- if (items.size() > numberOfItems) {
- items.pollLast();
- }
- });
- return items;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append('{');
- final AtomicBoolean first = new AtomicBoolean(true);
- longPairSets.forEach((key, longPairSet) -> {
- longPairSet.forEach((item1, item2) -> {
- if (!first.getAndSet(false)) {
- sb.append(", ");
- }
- sb.append('[');
- sb.append(item1);
- sb.append(':');
- sb.append(item2);
- sb.append(']');
- });
- });
- sb.append('}');
- return sb.toString();
- }
-
- @Override
- public boolean isEmpty() {
- if (longPairSets.isEmpty()) {
- return true;
- }
- for (ConcurrentLongPairSet subSet : longPairSets.values()) {
- if (!subSet.isEmpty()) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void clear() {
- longPairSets.clear();
- }
-
- @Override
- public long size() {
- MutableLong size = new MutableLong(0);
- longPairSets.forEach((__, longPairSet) ->
size.add(longPairSet.size()));
- return size.longValue();
- }
-
- @Override
- public long capacity() {
- MutableLong capacity = new MutableLong(0);
- longPairSets.forEach((__, longPairSet) ->
capacity.add(longPairSet.capacity()));
- return capacity.longValue();
- }
-
- @Override
- public boolean contains(long item1, long item2) {
- ConcurrentLongPairSet longPairSet = longPairSets.get(item1);
- if (longPairSet != null) {
- return longPairSet.contains(item1, item2);
- }
- return false;
- }
-
-}
\ No newline at end of file
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
deleted file mode 100644
index eff49883215..00000000000
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util.collections;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import lombok.Cleanup;
-import
org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair;
-import org.testng.annotations.Test;
-
-public class ConcurrentSortedLongPairSetTest {
-
- @Test
- public void simpleInsertions() {
- LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
- assertTrue(set.isEmpty());
- assertTrue(set.add(1, 1));
- assertFalse(set.isEmpty());
-
- assertTrue(set.add(2, 2));
- assertTrue(set.add(3, 3));
-
- assertEquals(set.size(), 3);
-
- assertTrue(set.contains(1, 1));
- assertEquals(set.size(), 3);
-
- assertTrue(set.remove(1, 1));
- assertEquals(set.size(), 2);
- assertFalse(set.contains(1, 1));
- assertFalse(set.contains(5, 5));
- assertEquals(set.size(), 2);
-
- assertTrue(set.add(1, 1));
- assertEquals(set.size(), 3);
- assertFalse(set.add(1, 1));
- assertEquals(set.size(), 3);
- }
-
- @Test
- public void testRemove() {
- LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
- assertTrue(set.isEmpty());
- assertTrue(set.add(1, 1));
- assertFalse(set.isEmpty());
-
- assertFalse(set.remove(1, 0));
- assertFalse(set.isEmpty());
- assertTrue(set.remove(1, 1));
- assertTrue(set.isEmpty());
- }
-
- @Test
- public void concurrentInsertions() throws Throwable {
- LongPairSet set = new ConcurrentSortedLongPairSet(16);
- @Cleanup("shutdownNow")
- ExecutorService executor = Executors.newCachedThreadPool();
-
- final int nThreads = 8;
- final int N = 1000;
-
- List<Future<?>> futures = new ArrayList<>();
- for (int i = 0; i < nThreads; i++) {
- final int threadIdx = i;
-
- futures.add(executor.submit(() -> {
- Random random = new Random();
-
- for (int j = 0; j < N; j++) {
- long key = random.nextLong();
- // Ensure keys are unique
- key -= key % (threadIdx + 1);
- key = Math.abs(key);
- set.add(key, key);
- }
- }));
- }
-
- for (Future<?> future : futures) {
- future.get();
- }
-
- assertEquals(set.size(), N * nThreads);
- }
-
- @Test
- public void testIteration() {
- LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- set.add(i, j);
- }
- }
-
- for (int i = 0; i < 10; i++) {
- final int firstKey = i;
- Set<LongPair> longSetResult = set.items(10);
- assertEquals(longSetResult.size(), 10);
- longSetResult.forEach(longPair -> {
- assertEquals(firstKey, longPair.first);
- });
- set.removeIf((item1, item2) -> item1 == firstKey);
- }
-
- }
-
- @Test
- public void testRemoval() {
- LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
- set.add(0, 0);
- set.add(1, 1);
- set.add(3, 3);
- set.add(6, 6);
- set.add(7, 7);
-
- List<LongPair> values = new ArrayList<>(set.items());
- values.sort(null);
- assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new
LongPair(1, 1), new LongPair(3, 3),
- new LongPair(6, 6), new LongPair(7, 7)));
-
- set.forEach((first, second) -> {
- if (first < 5) {
- set.remove(first, second);
- }
- });
- assertEquals(set.size(), values.size() - 3);
- values = new ArrayList<>(set.items());
- values.sort(null);
- assertEquals(values, Lists.newArrayList(new LongPair(6, 6), new
LongPair(7, 7)));
- }
-
- @Test
- public void testIfRemoval() {
- LongPairSet set = new ConcurrentSortedLongPairSet(16, 1, 1);
-
- set.add(0, 0);
- set.add(1, 1);
- set.add(3, 3);
- set.add(6, 6);
- set.add(7, 7);
-
- List<LongPair> values = new ArrayList<>(set.items());
- values.sort(null);
- assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new
LongPair(1, 1), new LongPair(3, 3),
- new LongPair(6, 6), new LongPair(7, 7)));
-
- int removeItems = set.removeIf((first, second) -> first < 5);
-
- assertEquals(3, removeItems);
- assertEquals(set.size(), values.size() - 3);
- values = new ArrayList<>(set.items());
- values.sort(null);
- assertEquals(values, Lists.newArrayList(new LongPair(6, 6), new
LongPair(7, 7)));
-
- set = new ConcurrentSortedLongPairSet(128, 2, true);
- set.add(2, 2);
- set.add(1, 3);
- set.add(3, 1);
- set.add(2, 1);
- set.add(3, 2);
- set.add(1, 2);
- set.add(1, 1);
- removeItems = set.removeIf((ledgerId, entryId) -> {
- return ComparisonChain.start().compare(ledgerId,
1).compare(entryId, 3)
- .result() <= 0;
- });
- assertEquals(removeItems, 3);
- }
-
- @Test
- public void testItems() {
- LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
- int n = 100;
- int limit = 10;
- for (int i = 0; i < n; i++) {
- set.add(i, i);
- }
-
- Set<LongPair> items = set.items();
- Set<LongPair> limitItems = set.items(limit);
- assertEquals(items.size(), n);
- assertEquals(limitItems.size(), limit);
-
- int totalRemovedItems = set.removeIf((first, second) ->
limitItems.contains((new LongPair(first, second))));
- assertEquals(limitItems.size(), totalRemovedItems);
- assertEquals(set.size(), n - limit);
- }
-
- @Test
- public void testEqualsObjects() {
-
- LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
- long t1 = 1;
- long t2 = 2;
- long t1_b = 1;
- assertEquals(t1, t1_b);
- assertNotEquals(t2, t1);
- assertNotEquals(t2, t1_b);
-
- set.add(t1, t1);
- assertTrue(set.contains(t1, t1));
- assertTrue(set.contains(t1_b, t1_b));
- assertFalse(set.contains(t2, t2));
-
- assertTrue(set.remove(t1_b, t1_b));
- assertFalse(set.contains(t1, t1));
- assertFalse(set.contains(t1_b, t1_b));
- }
-
- @Test
- public void testToString() {
-
- LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
- set.add(0, 0);
- set.add(1, 1);
- set.add(3, 3);
- final String toString = "{[0:0], [1:1], [3:3]}";
- System.out.println(set.toString());
- assertEquals(set.toString(), toString);
- }
-
- @Test
- public void testIsEmpty() {
- LongPairSet set = new ConcurrentSortedLongPairSet();
- assertTrue(set.isEmpty());
- set.add(1, 1);
- assertFalse(set.isEmpty());
- }
-
- @Test
- public void testShrink() {
- LongPairSet set = new ConcurrentSortedLongPairSet(2, 1, true);
- set.add(0, 0);
- assertTrue(set.capacity() == 4);
- set.add(0, 1);
- assertTrue(set.capacity() == 4);
- set.add(1, 1);
- assertTrue(set.capacity() == 8);
- set.add(1, 2);
- assertTrue(set.capacity() == 8);
- set.add(1, 3);
- set.add(1, 4);
- set.add(1, 5);
- assertTrue(set.capacity() == 12);
- set.remove(1, 5);
- // not shrink
- assertTrue(set.capacity() == 12);
- set.remove(1, 4);
- // the internal map does not keep shrinking at every remove() operation
- assertTrue(set.capacity() == 12);
- set.remove(1, 3);
- set.remove(1, 2);
- set.remove(1, 1);
- // shrink
- assertTrue(set.capacity() == 8);
- }
-}