This is an automated email from the ASF dual-hosted git repository.

psalagnac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new e364f397110 SOLR-17754 Fix race condition in overseer main loop. 
(#3350)
e364f397110 is described below

commit e364f3971100e323b14df8770473179475a2eb3e
Author: Pierre Salagnac <[email protected]>
AuthorDate: Fri Jun 6 17:43:18 2025 +0200

    SOLR-17754 Fix race condition in overseer main loop. (#3350)
    
    This fixes the overseer main loop so we never submit more than 100 
concurrent tasks to the thread pool. Instead of manually tracking when a task 
is complete, we check the status using a standard Future.
---
 solr/CHANGES.txt                                   |   9 +-
 .../apache/solr/cloud/OverseerTaskProcessor.java   | 176 +++++++++------------
 .../org/apache/solr/cloud/OverseerTaskQueue.java   |  48 ++++--
 .../OverseerCollectionConfigSetProcessorTest.java  | 153 ++++++++++++++----
 .../apache/solr/cloud/OverseerTaskQueueTest.java   |   2 +-
 5 files changed, 237 insertions(+), 151 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index bb8330eef54..0208b164396 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -56,7 +56,7 @@ Optimizations
 
 Bug Fixes
 ---------------------
-* SOLR-17772: Tests for CLI examples were failing on Windows due to a legacy 
bug uncovered by fix in SOLR-7962. 
+* SOLR-17772: Tests for CLI examples were failing on Windows due to a legacy 
bug uncovered by fix in SOLR-7962.
   Additionally achieves simplification of CLI tests (Rahul Goswami via Eric 
Pugh)
 
 Deprecation Removals
@@ -195,7 +195,7 @@ Other Changes
 New Features
 ---------------------
 * SOLR-17582: The CLUSTERSTATUS API will now stream each collection's status 
to the response,
-  fetching and computing it on the fly.  To avoid a backwards compatibilty 
concern, this won't work
+  fetching and computing it on the fly.  To avoid a backwards compatibility 
concern, this won't work
   for wt=javabin.  (Matthew Biscocho, David Smiley)
 
 * SOLR-17626: Add RawTFSimilarityFactory class. (Christine Poerschke)
@@ -296,9 +296,12 @@ Bug Fixes
 * SOLR-17758: The NumFieldLimitingUpdateRequestProcessor's "warnOnly" mode has 
been fixed, and now processes documents even
   when the limit has been exceeded. (Jason Gerlowski, Rahul Goswami)
 
-* SOLR-7962: Passing additional arguments to solr.cmd using "--jvm-opts" 
(formerly "-a") in conjunction with "-e" (examples like 'techproducts') 
+* SOLR-7962: Passing additional arguments to solr.cmd using "--jvm-opts" 
(formerly "-a") in conjunction with "-e" (examples like 'techproducts')
   wouldn't reflect on Windows (Rahul Goswami via Eric Pugh)
 
+* SOLR-17754: Fix rare bug in overseer main loop in case of high load, that 
may cause the overseer be fully stuck until
+  server restart. (Pierre Salagnac)
+
 Dependency Upgrades
 ---------------------
 * SOLR-17471: Upgrade Lucene to 9.12.1. (Pierre Salagnac, Christine Poerschke)
diff --git 
a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java 
b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index b56ed04f0f9..3751d850f09 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -24,13 +24,13 @@ import java.io.Closeable;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
@@ -70,35 +70,27 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
 
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private OverseerTaskQueue workQueue;
-  private DistributedMap runningMap;
-  private DistributedMap completedMap;
-  private DistributedMap failureMap;
+  private final OverseerTaskQueue workQueue;
+  private final DistributedMap runningMap;
+  private final DistributedMap completedMap;
+  private final DistributedMap failureMap;
 
   /**
-   * Set that maintains a list of all the tasks that are running. This is 
keyed on zk id of the
-   * task.
+   * All the tasks that have been submitted to the runner thread pool. This is 
keyed on zk id of the
+   * task. It may contain tasks that have completed execution, have been 
entered into the
+   * completed/failed map in zk but not deleted from the work-queue as that is 
a batched operation.
+   * This collection is accessed only in the main loop and does not need to be 
synchronized.
    */
-  private final Set<String> runningTasks;
+  private final Map<String, Future<?>> runningTasks;
 
-  /** List of completed tasks. This is used to clean up workQueue in zk. */
-  private final ConcurrentHashMap<String, QueueEvent> completedTasks;
+  private final String myId;
 
-  private volatile String myId;
-
-  private volatile ZkStateReader zkStateReader;
+  private final ZkStateReader zkStateReader;
 
   private boolean isClosed;
 
-  private volatile Stats stats;
-  private SolrMetricsContext overseerTaskProcessorMetricsContext;
-
-  /**
-   * Set of tasks that have been picked up for processing but not cleaned up 
from zk work-queue. It
-   * may contain tasks that have completed execution, have been entered into 
the completed/failed
-   * map in zk but not deleted from the work-queue as that is a batched 
operation.
-   */
-  private final Set<String> runningZKTasks;
+  private final Stats stats;
+  private final SolrMetricsContext overseerTaskProcessorMetricsContext;
 
   /**
    * This map may contain tasks which are read from work queue but could not 
be executed because
@@ -116,7 +108,7 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
       new Predicate<>() {
         @Override
         public boolean test(String s) {
-          return runningTasks.contains(s) || blockedTasks.containsKey(s);
+          return runningTasks.containsKey(s) || blockedTasks.containsKey(s);
         }
 
         @Override
@@ -129,9 +121,9 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
 
   protected OverseerMessageHandlerSelector selector;
 
-  private OverseerNodePrioritizer prioritizer;
+  private final OverseerNodePrioritizer prioritizer;
 
-  private String thisNode;
+  private final String thisNode;
 
   public OverseerTaskProcessor(
       ZkStateReader zkStateReader,
@@ -153,9 +145,7 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
     this.runningMap = runningMap;
     this.completedMap = completedMap;
     this.failureMap = failureMap;
-    this.runningZKTasks = ConcurrentHashMap.newKeySet();
-    this.runningTasks = ConcurrentHashMap.newKeySet();
-    this.completedTasks = new ConcurrentHashMap<>();
+    this.runningTasks = new HashMap<>();
     thisNode = MDCLoggingContext.getNodeName();
 
     overseerTaskProcessorMetricsContext = 
solrMetricsContext.getChildContext(this);
@@ -235,13 +225,7 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
             continue; // not a no, not a yes, try asking again
           }
 
-          if (log.isDebugEnabled()) {
-            log.debug(
-                "Cleaning up work-queue. #Running tasks: {} #Completed tasks: 
{}",
-                runningTasks.size(),
-                completedTasks.size());
-          }
-          cleanUpWorkQueue();
+          cleanUpRunningTasks();
 
           printTrackingMaps();
 
@@ -254,7 +238,9 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
             waited = true;
           }
 
-          if (waited) cleanUpWorkQueue();
+          if (waited) {
+            cleanUpRunningTasks();
+          }
 
           ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + 
MAX_PARALLEL_TASKS);
           heads.addAll(blockedTasks.values());
@@ -309,17 +295,16 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
           // on the OverseerCollectionMessageHandler
           batchSessionId++;
 
-          boolean tooManyTasks = false;
-          for (QueueEvent head : heads) {
-            if (!tooManyTasks) {
-              tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
-            }
-            if (tooManyTasks) {
+          for (int i = 0; i < heads.size(); i++) {
+            QueueEvent head = heads.get(i);
+
+            if (runningTasks.size() >= MAX_PARALLEL_TASKS) {
               // Too many tasks are running, just shove the rest into the 
"blocked" queue.
-              blockedTasks.put(head.getId(), head);
-              continue;
+              heads.subList(i, heads.size()).forEach(h -> 
blockedTasks.put(h.getId(), h));
+              break;
             }
-            if (runningZKTasks.contains(head.getId())) continue;
+
+            if (runningTasks.containsKey(head.getId())) continue;
             final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
             final String asyncId = message.getStr(ASYNC);
             if (hasLeftOverItems) {
@@ -329,14 +314,14 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
                 log.debug(
                     "Found already processed task in workQueue, cleaning up. 
AsyncId [{}]",
                     asyncId);
-                workQueue.remove(head);
+                workQueue.remove(head, false);
                 continue;
               }
             }
             String operation = message.getStr(Overseer.QUEUE_OPERATION);
             if (operation == null) {
               log.error("Msg does not have required {} : {}", 
Overseer.QUEUE_OPERATION, message);
-              workQueue.remove(head);
+              workQueue.remove(head, asyncId == null);
               continue;
             }
             OverseerMessageHandler messageHandler = 
selector.selectOverseerMessageHandler(message);
@@ -349,7 +334,11 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
               continue;
             }
             try {
-              markTaskAsRunning(head, asyncId);
+              if (asyncId != null) {
+                // Store the async ID in ZK distributed map before trying to 
execute the task
+                runningMap.put(asyncId, null);
+              }
+
               if (log.isDebugEnabled()) {
                 log.debug("Marked task [{}] as running", head.getId());
               }
@@ -372,8 +361,9 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
                   head.getId(),
                   message);
             }
-            Runner runner = new Runner(messageHandler, message, operation, 
head, lock);
-            tpe.execute(runner);
+            Runner runner = createRunner(messageHandler, message, operation, 
head, lock);
+            Future<?> future = tpe.submit(runner);
+            runningTasks.put(head.getId(), future);
           }
 
         } catch (KeeperException e) {
@@ -405,14 +395,12 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
     }
   }
 
-  private void cleanUpWorkQueue() throws KeeperException, InterruptedException 
{
-    Iterator<Map.Entry<String, QueueEvent>> it = 
completedTasks.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<String, QueueEvent> entry = it.next();
-      workQueue.remove(entry.getValue());
-      runningZKTasks.remove(entry.getKey());
-      it.remove();
+  /** Remove all entries from {@link #runningTasks} that are done. */
+  private void cleanUpRunningTasks() {
+    if (log.isDebugEnabled()) {
+      log.debug("Cleaning up running tasks. #Running tasks: {}", 
runningTasks.size());
     }
+    runningTasks.entrySet().removeIf(e -> e.getValue().isDone());
   }
 
   @Override
@@ -429,14 +417,9 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
 
   public static List<String> getSortedOverseerNodeNames(SolrZkClient zk)
       throws KeeperException, InterruptedException {
-    List<String> children = null;
-    try {
-      children = zk.getChildren(Overseer.OVERSEER_ELECT + 
LeaderElector.ELECTION_NODE, null, true);
-    } catch (Exception e) {
-      log.warn("error ", e);
-      return new ArrayList<>();
-    }
-    LeaderElector.sortSeqs(children);
+    List<String> children =
+        getSortedElectionNodes(zk, Overseer.OVERSEER_ELECT + 
LeaderElector.ELECTION_NODE);
+
     ArrayList<String> nodeNames = new ArrayList<>(children.size());
     for (String c : children) nodeNames.add(LeaderElector.getNodeName(c));
     return nodeNames;
@@ -444,14 +427,9 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
 
   public static List<String> getSortedElectionNodes(SolrZkClient zk, String 
path)
       throws KeeperException, InterruptedException {
-    List<String> children = null;
-    try {
-      children = zk.getChildren(path, null, true);
-      LeaderElector.sortSeqs(children);
-      return children;
-    } catch (Exception e) {
-      throw e;
-    }
+    List<String> children = zk.getChildren(path, null, true);
+    LeaderElector.sortSeqs(children);
+    return children;
   }
 
   public static String getLeaderNode(SolrZkClient zkClient)
@@ -518,12 +496,17 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
     return isClosed;
   }
 
-  private void markTaskAsRunning(QueueEvent head, String asyncId)
-      throws KeeperException, InterruptedException {
-    runningZKTasks.add(head.getId());
-    runningTasks.add(head.getId());
-
-    if (asyncId != null) runningMap.put(asyncId, null);
+  /**
+   * Create a runner instance to execute a single task. This method mosty 
exists to provide an
+   * extension point for tests.
+   */
+  protected Runner createRunner(
+      OverseerMessageHandler messageHandler,
+      ZkNodeProps message,
+      String operation,
+      QueueEvent head,
+      OverseerMessageHandler.Lock lock) {
+    return new Runner(messageHandler, message, operation, head, lock);
   }
 
   protected class Runner implements Runnable {
@@ -589,7 +572,7 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
           }
         }
 
-        markTaskComplete(head.getId(), asyncId);
+        markTaskComplete(asyncId);
         if (log.isDebugEnabled()) {
           log.debug("Marked task [{}] as completed.", head.getId());
         }
@@ -609,7 +592,7 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
         log.error("KeeperException", e);
       } catch (InterruptedException e) {
         // Reset task from tracking data structures so that it can be retried.
-        resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, 
message);
+        resetTaskWithException(head.getId(), asyncId, taskKey);
         log.warn("Resetting task {} as the thread was interrupted.", 
head.getId());
         Thread.currentThread().interrupt();
       } finally {
@@ -617,7 +600,7 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
         if (!success) {
           // Reset task from tracking data structures so that it can be 
retried.
           try {
-            resetTaskWithException(messageHandler, head.getId(), asyncId, 
taskKey, message);
+            resetTaskWithException(head.getId(), asyncId, taskKey);
           } catch (IllegalStateException ignore) {
 
           }
@@ -628,10 +611,11 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
       }
     }
 
-    private void markTaskComplete(String id, String asyncId)
-        throws KeeperException, InterruptedException {
-      completedTasks.put(id, head);
-      runningTasks.remove(id);
+    /**
+     * Invoked by the runner thread once the task is successfully completed 
(can be with error
+     * status). We remove the task from the work queue.
+     */
+    private void markTaskComplete(String asyncId) throws KeeperException, 
InterruptedException {
 
       if (asyncId != null) {
         if (!runningMap.remove(asyncId)) {
@@ -639,15 +623,14 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
         }
       }
 
-      workQueue.remove(head);
+      workQueue.remove(head, asyncId == null);
     }
 
-    private void resetTaskWithException(
-        OverseerMessageHandler messageHandler,
-        String id,
-        String asyncId,
-        String taskKey,
-        ZkNodeProps message) {
+    /**
+     * Reset the task so it will be retried by a future iteration of the main 
loop. We remove the
+     * async ID from the running map, but we keep the task in the work queue.
+     */
+    private void resetTaskWithException(String id, String asyncId, String 
taskKey) {
       log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, 
taskKey);
       try {
         if (asyncId != null) {
@@ -656,7 +639,6 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
           }
         }
 
-        runningTasks.remove(id);
       } catch (KeeperException e) {
         log.error("KeeperException", e);
       } catch (InterruptedException e) {
@@ -682,10 +664,8 @@ public class OverseerTaskProcessor implements Runnable, 
Closeable {
 
   private void printTrackingMaps() {
     if (log.isDebugEnabled()) {
-      log.debug("RunningTasks: {}", runningTasks);
+      log.debug("RunningTasks: {}", runningTasks.keySet());
       log.debug("BlockedTasks: {}", blockedTasks.keySet()); // nowarn
-      log.debug("CompletedTasks: {}", completedTasks.keySet()); // nowarn
-      log.debug("RunningZKTasks: {}", runningZKTasks); // nowarn
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java 
b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index c79b1d40ff9..04da2d16ab1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -101,21 +101,39 @@ public class OverseerTaskQueue extends ZkDistributedQueue 
{
     return false;
   }
 
-  /** Remove the event and save the response into the other path. */
-  public void remove(QueueEvent event) throws KeeperException, 
InterruptedException {
+  /**
+   * Remove the event and save the response into the other path.
+   *
+   * <p>The response node should be updated only for a synchronous command, 
where the request node
+   * was created with specialized {@link #offer(byte[], long)}, that also 
created the response node.
+   * For an asynchronous command, the request node was created with standard 
{@link #offer(byte[])}
+   * method, which does not create the response node. For such commands, the 
result is stored using
+   * {@link DistributedMap} instances.
+   *
+   * @param setResult Whether we set data into the result node (true for 
synchronous commands).
+   */
+  public void remove(QueueEvent event, boolean setResult)
+      throws KeeperException, InterruptedException {
     Timer.Context time = stats.time(dir + "_remove_event");
     try {
       String path = event.getId();
-      String responsePath = dir + "/" + RESPONSE_PREFIX + 
path.substring(path.lastIndexOf('-') + 1);
 
-      try {
-        zookeeper.setData(responsePath, event.getBytes(), true);
-      } catch (KeeperException.NoNodeException ignored) {
-        // we must handle the race case where the node no longer exists
-        log.info(
-            "Response ZK path: {} doesn't exist. Requestor may have 
disconnected from ZooKeeper",
-            responsePath);
+      // Set response data in the response node
+      if (setResult) {
+        String responsePath =
+            dir + "/" + RESPONSE_PREFIX + path.substring(path.lastIndexOf('-') 
+ 1);
+
+        try {
+          zookeeper.setData(responsePath, event.getBytes(), true);
+        } catch (KeeperException.NoNodeException ignored) {
+          // we must handle the race case where the node no longer exists
+          log.info(
+              "Response ZK path: {} doesn't exist. Requestor may have 
disconnected from ZooKeeper",
+              responsePath);
+        }
       }
+
+      // Remove the request node
       try {
         zookeeper.delete(path, -1, true);
       } catch (KeeperException.NoNodeException ignored) {
@@ -190,7 +208,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
   /**
    * Inserts data into zookeeper.
    *
-   * @return true if data was successfully added
+   * @return The path of the created node.
    */
   private String createData(String path, byte[] data, CreateMode mode)
       throws KeeperException, InterruptedException {
@@ -322,8 +340,8 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
       return Objects.equals(id, other.id);
     }
 
-    private WatchedEvent event = null;
-    private String id;
+    private final WatchedEvent event;
+    private final String id;
     private byte[] bytes;
 
     QueueEvent(String id, byte[] bytes, WatchedEvent event) {
@@ -332,10 +350,6 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
       this.event = event;
     }
 
-    public void setId(String id) {
-      this.id = id;
-    }
-
     public String getId() {
       return id;
     }
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
 
b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index b38ad73f820..5c0451e0b01 100644
--- 
a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ 
b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -43,9 +43,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import org.apache.http.client.HttpClient;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrResponse;
@@ -144,11 +146,17 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
   private OverseerCollectionConfigSetProcessorToBeTested underTest;
 
   private Thread thread;
-  private final Queue<QueueEvent> queue = new ArrayBlockingQueue<>(10);
+  private final Queue<QueueEvent> queue = new LinkedBlockingQueue<>();
 
   private static class OverseerCollectionConfigSetProcessorToBeTested
       extends OverseerCollectionConfigSetProcessor {
 
+    /**
+     * If non-null, all the threads will wait on this latch after task 
execution. This is used to
+     * force thread contention in post-processing.
+     */
+    private final CountDownLatch postLatch;
+
     public OverseerCollectionConfigSetProcessorToBeTested(
         ZkStateReader zkStateReader,
         String myId,
@@ -159,7 +167,8 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
         Overseer overseer,
         DistributedMap completedMap,
         DistributedMap failureMap,
-        SolrMetricsContext solrMetricsContext) {
+        SolrMetricsContext solrMetricsContext,
+        CountDownLatch postLatch) {
       super(
           zkStateReader,
           myId,
@@ -173,12 +182,53 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
           completedMap,
           failureMap,
           solrMetricsContext);
+
+      this.postLatch = postLatch;
     }
 
     @Override
     protected LeaderStatus amILeader() {
       return LeaderStatus.YES;
     }
+
+    @Override
+    protected Runner createRunner(
+        OverseerMessageHandler messageHandler,
+        ZkNodeProps message,
+        String operation,
+        QueueEvent head,
+        OverseerMessageHandler.Lock lock) {
+      return new LatchRunner(messageHandler, message, operation, head, lock);
+    }
+
+    /** Override the default runner to wait on a latch once the task is 
completed. */
+    private class LatchRunner extends Runner {
+
+      public LatchRunner(
+          OverseerMessageHandler messageHandler,
+          ZkNodeProps message,
+          String operation,
+          QueueEvent head,
+          OverseerMessageHandler.Lock lock) {
+        super(messageHandler, message, operation, head, lock);
+      }
+
+      @Override
+      public void run() {
+        super.run();
+
+        if (postLatch != null) {
+          try {
+            boolean success = postLatch.await(MAX_WAIT_MS, 
TimeUnit.MILLISECONDS);
+            if (!success) {
+              throw new RuntimeException("Timed out waiting for postLatch");
+            }
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
   }
 
   @BeforeClass
@@ -278,7 +328,7 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
   @Override
   @After
   public void tearDown() throws Exception {
-    stopComponentUnderTest();
+    stopProcessor();
     super.tearDown();
   }
 
@@ -289,15 +339,18 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
     when(workQueueMock.peekTopN(anyInt(), any(), anyLong()))
         .thenAnswer(
             invocation -> {
-              Object result;
-              int count = 0;
-              while ((result = queue.peek()) == null) {
+              int n = invocation.getArgument(0);
+
+              int retries = 0;
+              while (retries < 2) {
+                List<Object> results = 
queue.stream().limit(n).collect(Collectors.toList());
+                if (!results.isEmpty()) {
+                  return results;
+                }
                 Thread.sleep(1000);
-                count++;
-                if (count > 1) return null;
+                retries++;
               }
-
-              return List.of(result);
+              return Collections.emptyList();
             });
 
     when(workQueueMock.getTailId())
@@ -327,7 +380,7 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
               return null;
             })
         .when(workQueueMock)
-        .remove(any(QueueEvent.class));
+        .remove(any(QueueEvent.class), anyBoolean());
 
     when(workQueueMock.poll())
         .thenAnswer(
@@ -700,12 +753,26 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
     }
   }
 
-  protected void startComponentUnderTest() {
+  protected void createAndStartProcessor(CountDownLatch postLatch) {
+    underTest =
+        new OverseerCollectionConfigSetProcessorToBeTested(
+            zkStateReaderMock,
+            "1234",
+            shardHandlerFactoryMock,
+            ADMIN_PATH,
+            workQueueMock,
+            runningMapMock,
+            overseerMock,
+            completedMapMock,
+            failureMapMock,
+            solrMetricsContextMock,
+            postLatch);
+
     thread = new Thread(underTest);
     thread.start();
   }
 
-  protected void stopComponentUnderTest() throws Exception {
+  protected void stopProcessor() throws Exception {
     if (null != underTest) {
       underTest.close();
       underTest = null;
@@ -756,6 +823,20 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
     queue.add(qe);
   }
 
+  /** Submit a dumb job to the overseer that does nothing. */
+  private void issueMockJob(String id) {
+    Map<String, Object> propMap =
+        Map.of(
+            Overseer.QUEUE_OPERATION,
+            CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
+            "name",
+            id);
+
+    ZkNodeProps props = new ZkNodeProps(propMap);
+    QueueEvent qe = new QueueEvent(id, Utils.toJSON(props), null);
+    queue.add(qe);
+  }
+
   protected void verifySubmitCaptures(
       Integer numberOfSlices, Integer numberOfReplica, Collection<String> 
createNodes) {
     List<String> coreNames = new ArrayList<>();
@@ -957,24 +1038,7 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
 
     if (random().nextBoolean()) Collections.shuffle(createNodeList, random());
 
-    underTest =
-        new OverseerCollectionConfigSetProcessorToBeTested(
-            zkStateReaderMock,
-            "1234",
-            shardHandlerFactoryMock,
-            ADMIN_PATH,
-            workQueueMock,
-            runningMapMock,
-            overseerMock,
-            completedMapMock,
-            failureMapMock,
-            solrMetricsContextMock);
-
-    if (log.isInfoEnabled()) {
-      log.info("clusterstate {}", clusterStateMock.hashCode());
-    }
-
-    startComponentUnderTest();
+    createAndStartProcessor(null);
 
     final List<String> createNodeListToSend =
         ((createNodeListOption != CreateNodeListOptions.SEND_NULL) ? 
createNodeList : null);
@@ -1350,4 +1414,29 @@ public class OverseerCollectionConfigSetProcessorTest 
extends SolrTestCaseJ4 {
         false,
         distributedClusterStateUpdates);
   }
+
+  /** Check no overseer task is rejected when the queue is flooded. */
+  @Test
+  public void testFloodQueue() throws Exception {
+
+    commonMocks(2, false);
+
+    // Set a latch, so all thread will be waiting once the task processing 
completes, but before
+    // the thread are returned to the pool.
+    // This validates that the main thread (that reads tasks from the 
distributed queue) does not
+    // submit more tasks than the pool can handle, even if the previous tasks 
were already
+    // completed.
+    CountDownLatch postLatch = new CountDownLatch(1);
+    createAndStartProcessor(postLatch);
+
+    for (int i = 0; i < OverseerTaskProcessor.MAX_PARALLEL_TASKS + 10; i++) {
+      issueMockJob(Integer.toString(i));
+    }
+
+    Thread.sleep(1000);
+    underTest.postLatch.countDown();
+
+    waitForEmptyQueue();
+    stopProcessor();
+  }
 }
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java 
b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
index 0b748a0a25f..9d8229a5af5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
@@ -92,7 +92,7 @@ public class OverseerTaskQueueTest extends 
DistributedQueueTest {
     }
     assertNotNull("Didn't find event with requestid " + requestId2, 
requestId2Event);
     requestId2Event.setBytes("foo bar".getBytes(StandardCharsets.UTF_8));
-    tq.remove(requestId2Event);
+    tq.remove(requestId2Event, true);
 
     // Make sure this call to check if requestId exists doesn't barf with Json 
parse exception
     assertTrue(

Reply via email to