This is an automated email from the ASF dual-hosted git repository. noble pushed a commit to branch jira/solr16175 in repository https://gitbox.apache.org/repos/asf/solr.git
commit c1ae998e7ad3650229449ff7b2a55ef222ec8b8c Author: Noble Paul <[email protected]> AuthorDate: Wed May 4 13:55:24 2022 +1000 race condition SizeLimitedDistributedMap --- .../solr/cloud/SizeLimitedDistributedMap.java | 11 ++-- .../solr/cloud/TestSizeLimitedDistributedMap.java | 61 ++++++++++++++++++++-- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java index db57ea4980b..2e021a54c53 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java +++ b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java @@ -91,9 +91,14 @@ public class SizeLimitedDistributedMap extends DistributedMap { for (String child : children) { Long id = childToModificationZxid.get(child); if (id != null && id <= topElementMzxId) { - zookeeper.delete(dir + "/" + child, -1, true); - if (onOverflowObserver != null) - onOverflowObserver.onChildDelete(child.substring(PREFIX.length())); + try { + zookeeper.delete(dir + "/" + child, -1, true); + if (onOverflowObserver != null) { + onOverflowObserver.onChildDelete(child.substring(PREFIX.length())); + } + } catch (KeeperException.NoNodeException e) { + //Ignore. It's already deleted by another thread + } } } } diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java b/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java index a086656d13c..5b8f3c3d5cb 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java @@ -17,10 +17,13 @@ package org.apache.solr.cloud; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.common.Callable; import org.apache.solr.common.cloud.SolrZkClient; public class TestSizeLimitedDistributedMap extends TestDistributedMap { @@ -67,6 +70,56 @@ public class TestSizeLimitedDistributedMap extends TestDistributedMap { } } + public void testConcurrentCleanup() throws Exception { + final Set<String> expectedKeys = new HashSet<>(); + final List<String> deletedItems = new LinkedList<>(); + int numResponsesToStore = TEST_NIGHTLY ? Overseer.NUM_RESPONSES_TO_STORE : 100; + + try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) { + String path = getAndMakeInitialPath(zkClient); + final DistributedMap map = new SizeLimitedDistributedMap(zkClient, path, numResponsesToStore, deletedItems::add); + //fill the map to limit first + for (int i = 0; i < numResponsesToStore; i++) { + map.put("xyz_" + i, new byte[0]); + } + + // add more elements concurrently to trigger cleanup + final int THREAD_COUNT = Math.min(100, numResponsesToStore); + List<Callable> callables = new ArrayList<>(); + for (int i = 0; i < THREAD_COUNT; i++) { + final String key = "xyz_" + (numResponsesToStore + 1); + expectedKeys.add(key); + callables.add(new Callable() { + @Override + public void call(Object o) { + try { + map.put(key, new byte[0]); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); + List<Future<Object>> futures = new ArrayList<>(); + for (Callable callable : callables) { + futures.add(executorService.submit(callable)); + } + try { + for (Future<Object> future : futures) { + future.get(); //none of them should throw exception + } + for (String expectedKey : expectedKeys) { + assertTrue(map.contains(expectedKey)); + } + //there's no guarantees on exactly how many elements will be removed, but it should at least NOT throw exception + assertFalse(deletedItems.isEmpty()); + } finally { + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + } + } + } protected DistributedMap createMap(SolrZkClient zkClient, String path) { return new SizeLimitedDistributedMap(zkClient, path, Overseer.NUM_RESPONSES_TO_STORE, null); }
