This is an automated email from the ASF dual-hosted git repository.
psalagnac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new e364f397110 SOLR-17754 Fix race condition in overseer main loop.
(#3350)
e364f397110 is described below
commit e364f3971100e323b14df8770473179475a2eb3e
Author: Pierre Salagnac <[email protected]>
AuthorDate: Fri Jun 6 17:43:18 2025 +0200
SOLR-17754 Fix race condition in overseer main loop. (#3350)
This fixes the overseer main loop so we never submit more than 100
concurrent tasks to the thread pool. Instead of manually tracking when a task
is complete, we check the status using a standard Future.
---
solr/CHANGES.txt | 9 +-
.../apache/solr/cloud/OverseerTaskProcessor.java | 176 +++++++++------------
.../org/apache/solr/cloud/OverseerTaskQueue.java | 48 ++++--
.../OverseerCollectionConfigSetProcessorTest.java | 153 ++++++++++++++----
.../apache/solr/cloud/OverseerTaskQueueTest.java | 2 +-
5 files changed, 237 insertions(+), 151 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index bb8330eef54..0208b164396 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -56,7 +56,7 @@ Optimizations
Bug Fixes
---------------------
-* SOLR-17772: Tests for CLI examples were failing on Windows due to a legacy
bug uncovered by fix in SOLR-7962.
+* SOLR-17772: Tests for CLI examples were failing on Windows due to a legacy
bug uncovered by fix in SOLR-7962.
Additionally achieves simplification of CLI tests (Rahul Goswami via Eric
Pugh)
Deprecation Removals
@@ -195,7 +195,7 @@ Other Changes
New Features
---------------------
* SOLR-17582: The CLUSTERSTATUS API will now stream each collection's status
to the response,
- fetching and computing it on the fly. To avoid a backwards compatibilty
concern, this won't work
+ fetching and computing it on the fly. To avoid a backwards compatibility
concern, this won't work
for wt=javabin. (Matthew Biscocho, David Smiley)
* SOLR-17626: Add RawTFSimilarityFactory class. (Christine Poerschke)
@@ -296,9 +296,12 @@ Bug Fixes
* SOLR-17758: The NumFieldLimitingUpdateRequestProcessor's "warnOnly" mode has
been fixed, and now processes documents even
when the limit has been exceeded. (Jason Gerlowski, Rahul Goswami)
-* SOLR-7962: Passing additional arguments to solr.cmd using "--jvm-opts"
(formerly "-a") in conjunction with "-e" (examples like 'techproducts')
+* SOLR-7962: Passing additional arguments to solr.cmd using "--jvm-opts"
(formerly "-a") in conjunction with "-e" (examples like 'techproducts')
wouldn't reflect on Windows (Rahul Goswami via Eric Pugh)
+* SOLR-17754: Fix rare bug in overseer main loop in case of high load, that
may cause the overseer be fully stuck until
+ server restart. (Pierre Salagnac)
+
Dependency Upgrades
---------------------
* SOLR-17471: Upgrade Lucene to 9.12.1. (Pierre Salagnac, Christine Poerschke)
diff --git
a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index b56ed04f0f9..3751d850f09 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -24,13 +24,13 @@ import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -70,35 +70,27 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private OverseerTaskQueue workQueue;
- private DistributedMap runningMap;
- private DistributedMap completedMap;
- private DistributedMap failureMap;
+ private final OverseerTaskQueue workQueue;
+ private final DistributedMap runningMap;
+ private final DistributedMap completedMap;
+ private final DistributedMap failureMap;
/**
- * Set that maintains a list of all the tasks that are running. This is
keyed on zk id of the
- * task.
+ * All the tasks that have been submitted to the runner thread pool. This is
keyed on zk id of the
+ * task. It may contain tasks that have completed execution, have been
entered into the
+ * completed/failed map in zk but not deleted from the work-queue as that is
a batched operation.
+ * This collection is accessed only in the main loop and does not need to be
synchronized.
*/
- private final Set<String> runningTasks;
+ private final Map<String, Future<?>> runningTasks;
- /** List of completed tasks. This is used to clean up workQueue in zk. */
- private final ConcurrentHashMap<String, QueueEvent> completedTasks;
+ private final String myId;
- private volatile String myId;
-
- private volatile ZkStateReader zkStateReader;
+ private final ZkStateReader zkStateReader;
private boolean isClosed;
- private volatile Stats stats;
- private SolrMetricsContext overseerTaskProcessorMetricsContext;
-
- /**
- * Set of tasks that have been picked up for processing but not cleaned up
from zk work-queue. It
- * may contain tasks that have completed execution, have been entered into
the completed/failed
- * map in zk but not deleted from the work-queue as that is a batched
operation.
- */
- private final Set<String> runningZKTasks;
+ private final Stats stats;
+ private final SolrMetricsContext overseerTaskProcessorMetricsContext;
/**
* This map may contain tasks which are read from work queue but could not
be executed because
@@ -116,7 +108,7 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
new Predicate<>() {
@Override
public boolean test(String s) {
- return runningTasks.contains(s) || blockedTasks.containsKey(s);
+ return runningTasks.containsKey(s) || blockedTasks.containsKey(s);
}
@Override
@@ -129,9 +121,9 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
protected OverseerMessageHandlerSelector selector;
- private OverseerNodePrioritizer prioritizer;
+ private final OverseerNodePrioritizer prioritizer;
- private String thisNode;
+ private final String thisNode;
public OverseerTaskProcessor(
ZkStateReader zkStateReader,
@@ -153,9 +145,7 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
this.runningMap = runningMap;
this.completedMap = completedMap;
this.failureMap = failureMap;
- this.runningZKTasks = ConcurrentHashMap.newKeySet();
- this.runningTasks = ConcurrentHashMap.newKeySet();
- this.completedTasks = new ConcurrentHashMap<>();
+ this.runningTasks = new HashMap<>();
thisNode = MDCLoggingContext.getNodeName();
overseerTaskProcessorMetricsContext =
solrMetricsContext.getChildContext(this);
@@ -235,13 +225,7 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
continue; // not a no, not a yes, try asking again
}
- if (log.isDebugEnabled()) {
- log.debug(
- "Cleaning up work-queue. #Running tasks: {} #Completed tasks:
{}",
- runningTasks.size(),
- completedTasks.size());
- }
- cleanUpWorkQueue();
+ cleanUpRunningTasks();
printTrackingMaps();
@@ -254,7 +238,9 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
waited = true;
}
- if (waited) cleanUpWorkQueue();
+ if (waited) {
+ cleanUpRunningTasks();
+ }
ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() +
MAX_PARALLEL_TASKS);
heads.addAll(blockedTasks.values());
@@ -309,17 +295,16 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
// on the OverseerCollectionMessageHandler
batchSessionId++;
- boolean tooManyTasks = false;
- for (QueueEvent head : heads) {
- if (!tooManyTasks) {
- tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
- }
- if (tooManyTasks) {
+ for (int i = 0; i < heads.size(); i++) {
+ QueueEvent head = heads.get(i);
+
+ if (runningTasks.size() >= MAX_PARALLEL_TASKS) {
// Too many tasks are running, just shove the rest into the
"blocked" queue.
- blockedTasks.put(head.getId(), head);
- continue;
+ heads.subList(i, heads.size()).forEach(h ->
blockedTasks.put(h.getId(), h));
+ break;
}
- if (runningZKTasks.contains(head.getId())) continue;
+
+ if (runningTasks.containsKey(head.getId())) continue;
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
@@ -329,14 +314,14 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
log.debug(
"Found already processed task in workQueue, cleaning up.
AsyncId [{}]",
asyncId);
- workQueue.remove(head);
+ workQueue.remove(head, false);
continue;
}
}
String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation == null) {
log.error("Msg does not have required {} : {}",
Overseer.QUEUE_OPERATION, message);
- workQueue.remove(head);
+ workQueue.remove(head, asyncId == null);
continue;
}
OverseerMessageHandler messageHandler =
selector.selectOverseerMessageHandler(message);
@@ -349,7 +334,11 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
continue;
}
try {
- markTaskAsRunning(head, asyncId);
+ if (asyncId != null) {
+ // Store the async ID in ZK distributed map before trying to
execute the task
+ runningMap.put(asyncId, null);
+ }
+
if (log.isDebugEnabled()) {
log.debug("Marked task [{}] as running", head.getId());
}
@@ -372,8 +361,9 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
head.getId(),
message);
}
- Runner runner = new Runner(messageHandler, message, operation,
head, lock);
- tpe.execute(runner);
+ Runner runner = createRunner(messageHandler, message, operation,
head, lock);
+ Future<?> future = tpe.submit(runner);
+ runningTasks.put(head.getId(), future);
}
} catch (KeeperException e) {
@@ -405,14 +395,12 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
}
}
- private void cleanUpWorkQueue() throws KeeperException, InterruptedException
{
- Iterator<Map.Entry<String, QueueEvent>> it =
completedTasks.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<String, QueueEvent> entry = it.next();
- workQueue.remove(entry.getValue());
- runningZKTasks.remove(entry.getKey());
- it.remove();
+ /** Remove all entries from {@link #runningTasks} that are done. */
+ private void cleanUpRunningTasks() {
+ if (log.isDebugEnabled()) {
+ log.debug("Cleaning up running tasks. #Running tasks: {}",
runningTasks.size());
}
+ runningTasks.entrySet().removeIf(e -> e.getValue().isDone());
}
@Override
@@ -429,14 +417,9 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
public static List<String> getSortedOverseerNodeNames(SolrZkClient zk)
throws KeeperException, InterruptedException {
- List<String> children = null;
- try {
- children = zk.getChildren(Overseer.OVERSEER_ELECT +
LeaderElector.ELECTION_NODE, null, true);
- } catch (Exception e) {
- log.warn("error ", e);
- return new ArrayList<>();
- }
- LeaderElector.sortSeqs(children);
+ List<String> children =
+ getSortedElectionNodes(zk, Overseer.OVERSEER_ELECT +
LeaderElector.ELECTION_NODE);
+
ArrayList<String> nodeNames = new ArrayList<>(children.size());
for (String c : children) nodeNames.add(LeaderElector.getNodeName(c));
return nodeNames;
@@ -444,14 +427,9 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
public static List<String> getSortedElectionNodes(SolrZkClient zk, String
path)
throws KeeperException, InterruptedException {
- List<String> children = null;
- try {
- children = zk.getChildren(path, null, true);
- LeaderElector.sortSeqs(children);
- return children;
- } catch (Exception e) {
- throw e;
- }
+ List<String> children = zk.getChildren(path, null, true);
+ LeaderElector.sortSeqs(children);
+ return children;
}
public static String getLeaderNode(SolrZkClient zkClient)
@@ -518,12 +496,17 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
return isClosed;
}
- private void markTaskAsRunning(QueueEvent head, String asyncId)
- throws KeeperException, InterruptedException {
- runningZKTasks.add(head.getId());
- runningTasks.add(head.getId());
-
- if (asyncId != null) runningMap.put(asyncId, null);
+ /**
+ * Create a runner instance to execute a single task. This method mosty
exists to provide an
+ * extension point for tests.
+ */
+ protected Runner createRunner(
+ OverseerMessageHandler messageHandler,
+ ZkNodeProps message,
+ String operation,
+ QueueEvent head,
+ OverseerMessageHandler.Lock lock) {
+ return new Runner(messageHandler, message, operation, head, lock);
}
protected class Runner implements Runnable {
@@ -589,7 +572,7 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
}
}
- markTaskComplete(head.getId(), asyncId);
+ markTaskComplete(asyncId);
if (log.isDebugEnabled()) {
log.debug("Marked task [{}] as completed.", head.getId());
}
@@ -609,7 +592,7 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
log.error("KeeperException", e);
} catch (InterruptedException e) {
// Reset task from tracking data structures so that it can be retried.
- resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey,
message);
+ resetTaskWithException(head.getId(), asyncId, taskKey);
log.warn("Resetting task {} as the thread was interrupted.",
head.getId());
Thread.currentThread().interrupt();
} finally {
@@ -617,7 +600,7 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
if (!success) {
// Reset task from tracking data structures so that it can be
retried.
try {
- resetTaskWithException(messageHandler, head.getId(), asyncId,
taskKey, message);
+ resetTaskWithException(head.getId(), asyncId, taskKey);
} catch (IllegalStateException ignore) {
}
@@ -628,10 +611,11 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
}
}
- private void markTaskComplete(String id, String asyncId)
- throws KeeperException, InterruptedException {
- completedTasks.put(id, head);
- runningTasks.remove(id);
+ /**
+ * Invoked by the runner thread once the task is successfully completed
(can be with error
+ * status). We remove the task from the work queue.
+ */
+ private void markTaskComplete(String asyncId) throws KeeperException,
InterruptedException {
if (asyncId != null) {
if (!runningMap.remove(asyncId)) {
@@ -639,15 +623,14 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
}
}
- workQueue.remove(head);
+ workQueue.remove(head, asyncId == null);
}
- private void resetTaskWithException(
- OverseerMessageHandler messageHandler,
- String id,
- String asyncId,
- String taskKey,
- ZkNodeProps message) {
+ /**
+ * Reset the task so it will be retried by a future iteration of the main
loop. We remove the
+ * async ID from the running map, but we keep the task in the work queue.
+ */
+ private void resetTaskWithException(String id, String asyncId, String
taskKey) {
log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId,
taskKey);
try {
if (asyncId != null) {
@@ -656,7 +639,6 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
}
}
- runningTasks.remove(id);
} catch (KeeperException e) {
log.error("KeeperException", e);
} catch (InterruptedException e) {
@@ -682,10 +664,8 @@ public class OverseerTaskProcessor implements Runnable,
Closeable {
private void printTrackingMaps() {
if (log.isDebugEnabled()) {
- log.debug("RunningTasks: {}", runningTasks);
+ log.debug("RunningTasks: {}", runningTasks.keySet());
log.debug("BlockedTasks: {}", blockedTasks.keySet()); // nowarn
- log.debug("CompletedTasks: {}", completedTasks.keySet()); // nowarn
- log.debug("RunningZKTasks: {}", runningZKTasks); // nowarn
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index c79b1d40ff9..04da2d16ab1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -101,21 +101,39 @@ public class OverseerTaskQueue extends ZkDistributedQueue
{
return false;
}
- /** Remove the event and save the response into the other path. */
- public void remove(QueueEvent event) throws KeeperException,
InterruptedException {
+ /**
+ * Remove the event and save the response into the other path.
+ *
+ * <p>The response node should be updated only for a synchronous command,
where the request node
+ * was created with specialized {@link #offer(byte[], long)}, that also
created the response node.
+ * For an asynchronous command, the request node was created with standard
{@link #offer(byte[])}
+ * method, which does not create the response node. For such commands, the
result is stored using
+ * {@link DistributedMap} instances.
+ *
+ * @param setResult Whether we set data into the result node (true for
synchronous commands).
+ */
+ public void remove(QueueEvent event, boolean setResult)
+ throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_remove_event");
try {
String path = event.getId();
- String responsePath = dir + "/" + RESPONSE_PREFIX +
path.substring(path.lastIndexOf('-') + 1);
- try {
- zookeeper.setData(responsePath, event.getBytes(), true);
- } catch (KeeperException.NoNodeException ignored) {
- // we must handle the race case where the node no longer exists
- log.info(
- "Response ZK path: {} doesn't exist. Requestor may have
disconnected from ZooKeeper",
- responsePath);
+ // Set response data in the response node
+ if (setResult) {
+ String responsePath =
+ dir + "/" + RESPONSE_PREFIX + path.substring(path.lastIndexOf('-')
+ 1);
+
+ try {
+ zookeeper.setData(responsePath, event.getBytes(), true);
+ } catch (KeeperException.NoNodeException ignored) {
+ // we must handle the race case where the node no longer exists
+ log.info(
+ "Response ZK path: {} doesn't exist. Requestor may have
disconnected from ZooKeeper",
+ responsePath);
+ }
}
+
+ // Remove the request node
try {
zookeeper.delete(path, -1, true);
} catch (KeeperException.NoNodeException ignored) {
@@ -190,7 +208,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
/**
* Inserts data into zookeeper.
*
- * @return true if data was successfully added
+ * @return The path of the created node.
*/
private String createData(String path, byte[] data, CreateMode mode)
throws KeeperException, InterruptedException {
@@ -322,8 +340,8 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
return Objects.equals(id, other.id);
}
- private WatchedEvent event = null;
- private String id;
+ private final WatchedEvent event;
+ private final String id;
private byte[] bytes;
QueueEvent(String id, byte[] bytes, WatchedEvent event) {
@@ -332,10 +350,6 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
this.event = event;
}
- public void setId(String id) {
- this.id = id;
- }
-
public String getId() {
return id;
}
diff --git
a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index b38ad73f820..5c0451e0b01 100644
---
a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++
b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -43,9 +43,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.http.client.HttpClient;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
@@ -144,11 +146,17 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
private OverseerCollectionConfigSetProcessorToBeTested underTest;
private Thread thread;
- private final Queue<QueueEvent> queue = new ArrayBlockingQueue<>(10);
+ private final Queue<QueueEvent> queue = new LinkedBlockingQueue<>();
private static class OverseerCollectionConfigSetProcessorToBeTested
extends OverseerCollectionConfigSetProcessor {
+ /**
+ * If non-null, all the threads will wait on this latch after task
execution. This is used to
+ * force thread contention in post-processing.
+ */
+ private final CountDownLatch postLatch;
+
public OverseerCollectionConfigSetProcessorToBeTested(
ZkStateReader zkStateReader,
String myId,
@@ -159,7 +167,8 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
Overseer overseer,
DistributedMap completedMap,
DistributedMap failureMap,
- SolrMetricsContext solrMetricsContext) {
+ SolrMetricsContext solrMetricsContext,
+ CountDownLatch postLatch) {
super(
zkStateReader,
myId,
@@ -173,12 +182,53 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
completedMap,
failureMap,
solrMetricsContext);
+
+ this.postLatch = postLatch;
}
@Override
protected LeaderStatus amILeader() {
return LeaderStatus.YES;
}
+
+ @Override
+ protected Runner createRunner(
+ OverseerMessageHandler messageHandler,
+ ZkNodeProps message,
+ String operation,
+ QueueEvent head,
+ OverseerMessageHandler.Lock lock) {
+ return new LatchRunner(messageHandler, message, operation, head, lock);
+ }
+
+ /** Override the default runner to wait on a latch once the task is
completed. */
+ private class LatchRunner extends Runner {
+
+ public LatchRunner(
+ OverseerMessageHandler messageHandler,
+ ZkNodeProps message,
+ String operation,
+ QueueEvent head,
+ OverseerMessageHandler.Lock lock) {
+ super(messageHandler, message, operation, head, lock);
+ }
+
+ @Override
+ public void run() {
+ super.run();
+
+ if (postLatch != null) {
+ try {
+ boolean success = postLatch.await(MAX_WAIT_MS,
TimeUnit.MILLISECONDS);
+ if (!success) {
+ throw new RuntimeException("Timed out waiting for postLatch");
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
}
@BeforeClass
@@ -278,7 +328,7 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
@Override
@After
public void tearDown() throws Exception {
- stopComponentUnderTest();
+ stopProcessor();
super.tearDown();
}
@@ -289,15 +339,18 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
when(workQueueMock.peekTopN(anyInt(), any(), anyLong()))
.thenAnswer(
invocation -> {
- Object result;
- int count = 0;
- while ((result = queue.peek()) == null) {
+ int n = invocation.getArgument(0);
+
+ int retries = 0;
+ while (retries < 2) {
+ List<Object> results =
queue.stream().limit(n).collect(Collectors.toList());
+ if (!results.isEmpty()) {
+ return results;
+ }
Thread.sleep(1000);
- count++;
- if (count > 1) return null;
+ retries++;
}
-
- return List.of(result);
+ return Collections.emptyList();
});
when(workQueueMock.getTailId())
@@ -327,7 +380,7 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
return null;
})
.when(workQueueMock)
- .remove(any(QueueEvent.class));
+ .remove(any(QueueEvent.class), anyBoolean());
when(workQueueMock.poll())
.thenAnswer(
@@ -700,12 +753,26 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
}
}
- protected void startComponentUnderTest() {
+ protected void createAndStartProcessor(CountDownLatch postLatch) {
+ underTest =
+ new OverseerCollectionConfigSetProcessorToBeTested(
+ zkStateReaderMock,
+ "1234",
+ shardHandlerFactoryMock,
+ ADMIN_PATH,
+ workQueueMock,
+ runningMapMock,
+ overseerMock,
+ completedMapMock,
+ failureMapMock,
+ solrMetricsContextMock,
+ postLatch);
+
thread = new Thread(underTest);
thread.start();
}
- protected void stopComponentUnderTest() throws Exception {
+ protected void stopProcessor() throws Exception {
if (null != underTest) {
underTest.close();
underTest = null;
@@ -756,6 +823,20 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
queue.add(qe);
}
+ /** Submit a dumb job to the overseer that does nothing. */
+ private void issueMockJob(String id) {
+ Map<String, Object> propMap =
+ Map.of(
+ Overseer.QUEUE_OPERATION,
+ CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
+ "name",
+ id);
+
+ ZkNodeProps props = new ZkNodeProps(propMap);
+ QueueEvent qe = new QueueEvent(id, Utils.toJSON(props), null);
+ queue.add(qe);
+ }
+
protected void verifySubmitCaptures(
Integer numberOfSlices, Integer numberOfReplica, Collection<String>
createNodes) {
List<String> coreNames = new ArrayList<>();
@@ -957,24 +1038,7 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
if (random().nextBoolean()) Collections.shuffle(createNodeList, random());
- underTest =
- new OverseerCollectionConfigSetProcessorToBeTested(
- zkStateReaderMock,
- "1234",
- shardHandlerFactoryMock,
- ADMIN_PATH,
- workQueueMock,
- runningMapMock,
- overseerMock,
- completedMapMock,
- failureMapMock,
- solrMetricsContextMock);
-
- if (log.isInfoEnabled()) {
- log.info("clusterstate {}", clusterStateMock.hashCode());
- }
-
- startComponentUnderTest();
+ createAndStartProcessor(null);
final List<String> createNodeListToSend =
((createNodeListOption != CreateNodeListOptions.SEND_NULL) ?
createNodeList : null);
@@ -1350,4 +1414,29 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
false,
distributedClusterStateUpdates);
}
+
+ /** Check no overseer task is rejected when the queue is flooded. */
+ @Test
+ public void testFloodQueue() throws Exception {
+
+ commonMocks(2, false);
+
+ // Set a latch, so all thread will be waiting once the task processing
completes, but before
+ // the thread are returned to the pool.
+ // This validates that the main thread (that reads tasks from the
distributed queue) does not
+ // submit more tasks than the pool can handle, even if the previous tasks
were already
+ // completed.
+ CountDownLatch postLatch = new CountDownLatch(1);
+ createAndStartProcessor(postLatch);
+
+ for (int i = 0; i < OverseerTaskProcessor.MAX_PARALLEL_TASKS + 10; i++) {
+ issueMockJob(Integer.toString(i));
+ }
+
+ Thread.sleep(1000);
+ underTest.postLatch.countDown();
+
+ waitForEmptyQueue();
+ stopProcessor();
+ }
}
diff --git
a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
index 0b748a0a25f..9d8229a5af5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
@@ -92,7 +92,7 @@ public class OverseerTaskQueueTest extends
DistributedQueueTest {
}
assertNotNull("Didn't find event with requestid " + requestId2,
requestId2Event);
requestId2Event.setBytes("foo bar".getBytes(StandardCharsets.UTF_8));
- tq.remove(requestId2Event);
+ tq.remove(requestId2Event, true);
// Make sure this call to check if requestId exists doesn't barf with Json
parse exception
assertTrue(