This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new acc463f4b2f [cleanup][broker] delete ConcurrentSortedLongPairSet. 
(#23202)
acc463f4b2f is described below

commit acc463f4b2f0648f5e3c9c146124a9223ec909f7
Author: Wenzhi Feng <[email protected]>
AuthorDate: Tue Aug 20 18:31:35 2024 +0800

    [cleanup][broker] delete ConcurrentSortedLongPairSet. (#23202)
---
 .../collections/ConcurrentSortedLongPairSet.java   | 215 ---------------
 .../ConcurrentSortedLongPairSetTest.java           | 291 ---------------------
 2 files changed, 506 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
deleted file mode 100644
index 0718a4f81a6..00000000000
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util.collections;
-
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.lang.mutable.MutableLong;
-import 
org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair;
-import 
org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairConsumer;
-
-/**
- * Sorted concurrent {@link LongPairSet} which is not fully accurate in 
sorting.
- *
- * {@link ConcurrentSortedLongPairSet} creates separate {@link 
ConcurrentLongPairSet} for unique first-key of
- * inserted item. So, it can iterate over all items by sorting on item's first 
key. However, item's second key will not
- * be sorted. eg:
- *
- * <pre>
- *  insert: (1,2), (1,4), (2,1), (1,5), (2,6)
- *  while iterating set will first read all the entries for items whose 
first-key=1 and then first-key=2.
- *  output: (1,4), (1,5), (1,2), (2,6), (2,1)
- * </pre>
- *
- * <p>This map can be expensive and not recommended if set has to store large 
number of unique item.first's key
- * because set has to create that many {@link ConcurrentLongPairSet} objects.
- */
-public class ConcurrentSortedLongPairSet implements LongPairSet {
-
-    protected final NavigableMap<Long, ConcurrentLongPairSet> longPairSets = 
new ConcurrentSkipListMap<>();
-    private final int expectedItems;
-    private final int concurrencyLevel;
-    /**
-     * If {@link #longPairSets} adds and removes the item-set frequently then 
it allocates and removes
-     * {@link ConcurrentLongPairSet} for the same item multiple times which 
can lead to gc-puases. To avoid such
-     * situation, avoid removing empty LogPairSet until it reaches max limit.
-     */
-    private final int maxAllowedSetOnRemove;
-    private final boolean autoShrink;
-    private static final int DEFAULT_MAX_ALLOWED_SET_ON_REMOVE = 10;
-
-    public ConcurrentSortedLongPairSet() {
-        this(16, 1, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
-    }
-
-    public ConcurrentSortedLongPairSet(int expectedItems) {
-        this(expectedItems, 1, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
-    }
-
-    public ConcurrentSortedLongPairSet(int expectedItems, int 
concurrencyLevel) {
-        this(expectedItems, concurrencyLevel, 
DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
-    }
-
-    public ConcurrentSortedLongPairSet(int expectedItems, int 
concurrencyLevel, boolean autoShrink) {
-        this(expectedItems, concurrencyLevel, 
DEFAULT_MAX_ALLOWED_SET_ON_REMOVE, autoShrink);
-    }
-
-    public ConcurrentSortedLongPairSet(int expectedItems, int 
concurrencyLevel, int maxAllowedSetOnRemove) {
-        this(expectedItems, concurrencyLevel, maxAllowedSetOnRemove, false);
-    }
-
-    public ConcurrentSortedLongPairSet(int expectedItems, int 
concurrencyLevel, int maxAllowedSetOnRemove,
-                                       boolean autoShrink) {
-        this.expectedItems = expectedItems;
-        this.concurrencyLevel = concurrencyLevel;
-        this.maxAllowedSetOnRemove = maxAllowedSetOnRemove;
-        this.autoShrink = autoShrink;
-    }
-
-    @Override
-    public boolean add(long item1, long item2) {
-        ConcurrentLongPairSet messagesToReplay = 
longPairSets.computeIfAbsent(item1,
-                (key) -> ConcurrentLongPairSet.newBuilder()
-                        .expectedItems(expectedItems)
-                        .concurrencyLevel(concurrencyLevel)
-                        .autoShrink(autoShrink)
-                        .build());
-        return messagesToReplay.add(item1, item2);
-    }
-
-    @Override
-    public boolean remove(long item1, long item2) {
-        ConcurrentLongPairSet messagesToReplay = longPairSets.get(item1);
-        if (messagesToReplay != null) {
-            boolean removed = messagesToReplay.remove(item1, item2);
-            if (messagesToReplay.isEmpty() && longPairSets.size() > 
maxAllowedSetOnRemove) {
-                longPairSets.remove(item1, messagesToReplay);
-            }
-            return removed;
-        }
-        return false;
-    }
-
-    @Override
-    public int removeIf(LongPairPredicate filter) {
-        MutableInt removedValues = new MutableInt(0);
-        longPairSets.forEach((item1, longPairSet) -> {
-            removedValues.add(longPairSet.removeIf(filter));
-            if (longPairSet.isEmpty() && longPairSets.size() > 
maxAllowedSetOnRemove) {
-                longPairSets.remove(item1, longPairSet);
-            }
-        });
-        return removedValues.intValue();
-    }
-
-    @Override
-    public Set<LongPair> items() {
-        return items((int) this.size());
-    }
-
-    @Override
-    public void forEach(LongPairConsumer processor) {
-        longPairSets.forEach((__, longPairSet) -> 
longPairSet.forEach(processor));
-    }
-
-    @Override
-    public Set<LongPair> items(int numberOfItems) {
-        return items(numberOfItems, (item1, item2) -> new LongPair(item1, 
item2));
-    }
-
-    @Override
-    public <T> Set<T> items(int numberOfItems, LongPairFunction<T> 
longPairConverter) {
-        NavigableSet<T> items = new TreeSet<>();
-        forEach((i1, i2) -> {
-            items.add(longPairConverter.apply(i1, i2));
-            if (items.size() > numberOfItems) {
-                items.pollLast();
-            }
-        });
-        return items;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append('{');
-        final AtomicBoolean first = new AtomicBoolean(true);
-        longPairSets.forEach((key, longPairSet) -> {
-            longPairSet.forEach((item1, item2) -> {
-                if (!first.getAndSet(false)) {
-                    sb.append(", ");
-                }
-                sb.append('[');
-                sb.append(item1);
-                sb.append(':');
-                sb.append(item2);
-                sb.append(']');
-            });
-        });
-        sb.append('}');
-        return sb.toString();
-    }
-
-    @Override
-    public boolean isEmpty() {
-        if (longPairSets.isEmpty()) {
-            return true;
-        }
-        for (ConcurrentLongPairSet subSet : longPairSets.values()) {
-            if (!subSet.isEmpty()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public void clear() {
-        longPairSets.clear();
-    }
-
-    @Override
-    public long size() {
-        MutableLong size = new MutableLong(0);
-        longPairSets.forEach((__, longPairSet) -> 
size.add(longPairSet.size()));
-        return size.longValue();
-    }
-
-    @Override
-    public long capacity() {
-        MutableLong capacity = new MutableLong(0);
-        longPairSets.forEach((__, longPairSet) -> 
capacity.add(longPairSet.capacity()));
-        return capacity.longValue();
-    }
-
-    @Override
-    public boolean contains(long item1, long item2) {
-        ConcurrentLongPairSet longPairSet = longPairSets.get(item1);
-        if (longPairSet != null) {
-            return longPairSet.contains(item1, item2);
-        }
-        return false;
-    }
-
-}
\ No newline at end of file
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
deleted file mode 100644
index eff49883215..00000000000
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util.collections;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import lombok.Cleanup;
-import 
org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair;
-import org.testng.annotations.Test;
-
-public class ConcurrentSortedLongPairSetTest {
-
-    @Test
-    public void simpleInsertions() {
-        LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
-        assertTrue(set.isEmpty());
-        assertTrue(set.add(1, 1));
-        assertFalse(set.isEmpty());
-
-        assertTrue(set.add(2, 2));
-        assertTrue(set.add(3, 3));
-
-        assertEquals(set.size(), 3);
-
-        assertTrue(set.contains(1, 1));
-        assertEquals(set.size(), 3);
-
-        assertTrue(set.remove(1, 1));
-        assertEquals(set.size(), 2);
-        assertFalse(set.contains(1, 1));
-        assertFalse(set.contains(5, 5));
-        assertEquals(set.size(), 2);
-
-        assertTrue(set.add(1, 1));
-        assertEquals(set.size(), 3);
-        assertFalse(set.add(1, 1));
-        assertEquals(set.size(), 3);
-    }
-
-    @Test
-    public void testRemove() {
-        LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
-        assertTrue(set.isEmpty());
-        assertTrue(set.add(1, 1));
-        assertFalse(set.isEmpty());
-
-        assertFalse(set.remove(1, 0));
-        assertFalse(set.isEmpty());
-        assertTrue(set.remove(1, 1));
-        assertTrue(set.isEmpty());
-    }
-
-    @Test
-    public void concurrentInsertions() throws Throwable {
-        LongPairSet set = new ConcurrentSortedLongPairSet(16);
-        @Cleanup("shutdownNow")
-        ExecutorService executor = Executors.newCachedThreadPool();
-
-        final int nThreads = 8;
-        final int N = 1000;
-
-        List<Future<?>> futures = new ArrayList<>();
-        for (int i = 0; i < nThreads; i++) {
-            final int threadIdx = i;
-
-            futures.add(executor.submit(() -> {
-                Random random = new Random();
-
-                for (int j = 0; j < N; j++) {
-                    long key = random.nextLong();
-                    // Ensure keys are unique
-                    key -= key % (threadIdx + 1);
-                    key = Math.abs(key);
-                    set.add(key, key);
-                }
-            }));
-        }
-
-        for (Future<?> future : futures) {
-            future.get();
-        }
-
-        assertEquals(set.size(), N * nThreads);
-    }
-
-    @Test
-    public void testIteration() {
-        LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
-        for (int i = 0; i < 10; i++) {
-            for (int j = 0; j < 10; j++) {
-                set.add(i, j);
-            }
-        }
-
-        for (int i = 0; i < 10; i++) {
-            final int firstKey = i;
-            Set<LongPair> longSetResult = set.items(10);
-            assertEquals(longSetResult.size(), 10);
-            longSetResult.forEach(longPair -> {
-                assertEquals(firstKey, longPair.first);
-            });
-            set.removeIf((item1, item2) -> item1 == firstKey);
-        }
-
-    }
-
-    @Test
-    public void testRemoval() {
-        LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
-        set.add(0, 0);
-        set.add(1, 1);
-        set.add(3, 3);
-        set.add(6, 6);
-        set.add(7, 7);
-
-        List<LongPair> values = new ArrayList<>(set.items());
-        values.sort(null);
-        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new 
LongPair(1, 1), new LongPair(3, 3),
-                new LongPair(6, 6), new LongPair(7, 7)));
-
-        set.forEach((first, second) -> {
-            if (first < 5) {
-                set.remove(first, second);
-            }
-        });
-        assertEquals(set.size(), values.size() - 3);
-        values = new ArrayList<>(set.items());
-        values.sort(null);
-        assertEquals(values, Lists.newArrayList(new LongPair(6, 6), new 
LongPair(7, 7)));
-    }
-
-    @Test
-    public void testIfRemoval() {
-        LongPairSet set = new ConcurrentSortedLongPairSet(16, 1, 1);
-
-        set.add(0, 0);
-        set.add(1, 1);
-        set.add(3, 3);
-        set.add(6, 6);
-        set.add(7, 7);
-
-        List<LongPair> values = new ArrayList<>(set.items());
-        values.sort(null);
-        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new 
LongPair(1, 1), new LongPair(3, 3),
-                new LongPair(6, 6), new LongPair(7, 7)));
-
-        int removeItems = set.removeIf((first, second) -> first < 5);
-
-        assertEquals(3, removeItems);
-        assertEquals(set.size(), values.size() - 3);
-        values = new ArrayList<>(set.items());
-        values.sort(null);
-        assertEquals(values, Lists.newArrayList(new LongPair(6, 6), new 
LongPair(7, 7)));
-
-        set = new ConcurrentSortedLongPairSet(128, 2, true);
-        set.add(2, 2);
-        set.add(1, 3);
-        set.add(3, 1);
-        set.add(2, 1);
-        set.add(3, 2);
-        set.add(1, 2);
-        set.add(1, 1);
-        removeItems = set.removeIf((ledgerId, entryId) -> {
-            return ComparisonChain.start().compare(ledgerId, 
1).compare(entryId, 3)
-                    .result() <= 0;
-        });
-        assertEquals(removeItems, 3);
-    }
-
-    @Test
-    public void testItems() {
-        LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
-        int n = 100;
-        int limit = 10;
-        for (int i = 0; i < n; i++) {
-            set.add(i, i);
-        }
-
-        Set<LongPair> items = set.items();
-        Set<LongPair> limitItems = set.items(limit);
-        assertEquals(items.size(), n);
-        assertEquals(limitItems.size(), limit);
-
-        int totalRemovedItems = set.removeIf((first, second) -> 
limitItems.contains((new LongPair(first, second))));
-        assertEquals(limitItems.size(), totalRemovedItems);
-        assertEquals(set.size(), n - limit);
-    }
-
-    @Test
-    public void testEqualsObjects() {
-
-        LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
-        long t1 = 1;
-        long t2 = 2;
-        long t1_b = 1;
-        assertEquals(t1, t1_b);
-        assertNotEquals(t2, t1);
-        assertNotEquals(t2, t1_b);
-
-        set.add(t1, t1);
-        assertTrue(set.contains(t1, t1));
-        assertTrue(set.contains(t1_b, t1_b));
-        assertFalse(set.contains(t2, t2));
-
-        assertTrue(set.remove(t1_b, t1_b));
-        assertFalse(set.contains(t1, t1));
-        assertFalse(set.contains(t1_b, t1_b));
-    }
-
-    @Test
-    public void testToString() {
-
-        LongPairSet set = new ConcurrentSortedLongPairSet(16);
-
-        set.add(0, 0);
-        set.add(1, 1);
-        set.add(3, 3);
-        final String toString = "{[0:0], [1:1], [3:3]}";
-        System.out.println(set.toString());
-        assertEquals(set.toString(), toString);
-    }
-
-    @Test
-    public void testIsEmpty() {
-        LongPairSet set = new ConcurrentSortedLongPairSet();
-        assertTrue(set.isEmpty());
-        set.add(1, 1);
-        assertFalse(set.isEmpty());
-    }
-
-    @Test
-    public void testShrink() {
-        LongPairSet set = new ConcurrentSortedLongPairSet(2, 1, true);
-        set.add(0, 0);
-        assertTrue(set.capacity() == 4);
-        set.add(0, 1);
-        assertTrue(set.capacity() == 4);
-        set.add(1, 1);
-        assertTrue(set.capacity() == 8);
-        set.add(1, 2);
-        assertTrue(set.capacity() == 8);
-        set.add(1, 3);
-        set.add(1, 4);
-        set.add(1, 5);
-        assertTrue(set.capacity() == 12);
-        set.remove(1, 5);
-        // not shrink
-        assertTrue(set.capacity() == 12);
-        set.remove(1, 4);
-        // the internal map does not keep shrinking at every remove() operation
-        assertTrue(set.capacity() == 12);
-        set.remove(1, 3);
-        set.remove(1, 2);
-        set.remove(1, 1);
-        // shrink
-        assertTrue(set.capacity() == 8);
-    }
-}

Reply via email to