jpountz commented on code in PR #13124:
URL: https://github.com/apache/lucene/pull/13124#discussion_r1521095208


##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -910,4 +941,58 @@ public void setSuppressExceptions(ConcurrentMergeScheduler 
cms) {
           }
         });
   }
+
+  private class ScaledExecutor implements Executor {
+
+    private final AtomicInteger activeCount = new AtomicInteger(0);
+    private final ThreadPoolExecutor executor;
+
+    public ScaledExecutor() {
+      this.executor =
+          new ThreadPoolExecutor(
+              0, Math.max(1, maxThreadCount), 1L, TimeUnit.MINUTES, new 
SynchronousQueue<>());
+    }
+
+    void shutdown() {
+      executor.shutdown();
+    }
+
+    private void updatePoolSize() {
+      executor.setMaximumPoolSize(Math.max(1, maxThreadCount));
+    }

Review Comment:
   It feels like we don't actually need this since we already have control on 
the thread pool size via `maxThreadCount`. What about setting a high max thread 
count in the ctor and never updating it (ie. making it a "cached" executor 
rather than a "scaled" executor).



##########
lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java:
##########
@@ -52,6 +53,14 @@ public Directory wrapForMerge(OneMerge merge, Directory in) {
     return in;
   }
 
+  /**
+   * Provides an executor for parallelism during a single merge operation. By 
default, the method
+   * returns `null` indicating that there is no parallelism during a single 
merge operation.
+   */
+  public Executor getIntraMergeExecutor(OneMerge merge) {
+    return null;

Review Comment:
   What about returning `SameThreadExecutor` by default instead?



##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -910,4 +936,68 @@ public void setSuppressExceptions(ConcurrentMergeScheduler 
cms) {
           }
         });
   }
+
+  private class ScaledExecutor implements Executor {
+
+    private final AtomicInteger activeCount = new AtomicInteger(0);
+    private final ThreadPoolExecutor executor;
+
+    public ScaledExecutor() {
+      this.executor =
+          new ThreadPoolExecutor(
+              0, Math.max(1, maxThreadCount), 1L, TimeUnit.MINUTES, new 
SynchronousQueue<>());
+    }
+
+    void shutdown() {
+      executor.shutdown();
+    }
+
+    private void updatePoolSize() {
+      executor.setMaximumPoolSize(Math.max(1, maxThreadCount));
+    }
+
+    @Override
+    public void execute(Runnable command) {
+      assert mergeThreads.contains(Thread.currentThread()) : "caller is not a 
merge thread";
+      Thread currentThread = Thread.currentThread();
+      if (currentThread instanceof MergeThread mergeThread) {
+        execute(mergeThread, command);
+      } else {
+        command.run();
+      }
+    }
+
+    private void execute(MergeThread mergeThread, Runnable command) {
+      // don't do multithreaded merges for small merges
+      if (mergeThread.merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 
1024) {
+        command.run();
+      } else {
+        final boolean isThreadAvailable;
+        // we need to check if a thread is available before submitting the 
task to the executor
+        // synchronize on CMS to get an accurate count of current threads
+        synchronized (ConcurrentMergeScheduler.this) {
+          int max = maxThreadCount - mergeThreads.size();
+          int value = activeCount.get();
+          if (value < max) {
+            activeCount.incrementAndGet();
+            isThreadAvailable = true;
+          } else {
+            isThreadAvailable = false;
+          }
+        }

Review Comment:
   @benwtrent ++ There is a risk of letting bigger merges starve smaller merges 
from threads otherwise, and I'm not sure how we could fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to