This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 512b70c4687 Make MSQ tests run faster (#17718)
512b70c4687 is described below

commit 512b70c4687fd0da1845c98693b8024bf829992a
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Thu Feb 13 08:35:21 2025 +0100

    Make MSQ tests run faster (#17718)
---
 .../msq/indexing/IndexerControllerContext.java     |  4 +-
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 28 +++++++++-----
 .../org/apache/druid/msq/exec/MSQTasksTest.java    |  4 +-
 .../msq/indexing/MSQWorkerTaskLauncherTest.java    |  4 +-
 .../druid/msq/test/CalciteMSQTestsHelper.java      | 44 +++++++++++++++++++---
 .../druid/msq/test/MSQTestControllerContext.java   | 13 +++++--
 6 files changed, 76 insertions(+), 21 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index a4d7778a841..e74ec7f2a66 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -41,6 +41,7 @@ import org.apache.druid.msq.exec.WorkerClient;
 import org.apache.druid.msq.exec.WorkerFailureListener;
 import org.apache.druid.msq.exec.WorkerManager;
 import org.apache.druid.msq.guice.MultiStageQuery;
+import 
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
 import org.apache.druid.msq.indexing.client.ControllerChatHandler;
 import org.apache.druid.msq.indexing.client.IndexerWorkerClient;
 import org.apache.druid.msq.indexing.error.MSQException;
@@ -216,7 +217,8 @@ public class IndexerControllerContext implements 
ControllerContext
         workerFailureListener,
         makeTaskContext(querySpec, queryKernelConfig, task.getContext()),
         // 10 minutes +- 2 minutes jitter
-        TimeUnit.SECONDS.toMillis(600 + 
ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
+        TimeUnit.SECONDS.toMillis(600 + 
ThreadLocalRandom.current().nextInt(-4, 5) * 30L),
+        new MSQWorkerTaskLauncherConfig()
     );
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index ed32b81f44e..5d19c899b10 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -72,12 +72,17 @@ import java.util.stream.Collectors;
 public class MSQWorkerTaskLauncher implements RetryCapableWorkerManager
 {
   private static final Logger log = new Logger(MSQWorkerTaskLauncher.class);
-  private static final long HIGH_FREQUENCY_CHECK_MILLIS = 100;
-  private static final long LOW_FREQUENCY_CHECK_MILLIS = 2000;
-  private static final long SWITCH_TO_LOW_FREQUENCY_CHECK_AFTER_MILLIS = 10000;
-  private static final long SHUTDOWN_TIMEOUT_MILLIS = 
Duration.ofMinutes(1).toMillis();
+
   private int currentRelaunchCount = 0;
 
+  public static class MSQWorkerTaskLauncherConfig
+  {
+    public long highFrequencyCheckMillis = 100;
+    public long lowFrequencyCheckMillis = 2000;
+    public long switchToLowFrequencyCheckAfterMillis = 10000;
+    public long shutdownTimeoutMillis = Duration.ofMinutes(1).toMillis();
+  }
+
   // States for "state" variable.
   private enum State
   {
@@ -91,6 +96,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
   private final OverlordClient overlordClient;
   private final ExecutorService exec;
   private final long maxTaskStartDelayMillis;
+  private final MSQWorkerTaskLauncherConfig config;
 
   // Mutable state meant to be accessible by threads outside the main loop.
   private final SettableFuture<?> stopFuture = SettableFuture.create();
@@ -150,7 +156,8 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
       final OverlordClient overlordClient,
       final WorkerFailureListener workerFailureListener,
       final Map<String, Object> taskContextOverrides,
-      final long maxTaskStartDelayMillis
+      final long maxTaskStartDelayMillis,
+      final MSQWorkerTaskLauncherConfig config
   )
   {
     this.controllerTaskId = controllerTaskId;
@@ -162,6 +169,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
     );
     this.workerFailureListener = workerFailureListener;
     this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
+    this.config = config;
   }
 
   @Override
@@ -396,11 +404,11 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
         // Sleep for a bit, maybe.
         final long now = System.currentTimeMillis();
 
-        if (now > stopStartTime + SHUTDOWN_TIMEOUT_MILLIS) {
+        if (now > stopStartTime + config.shutdownTimeoutMillis) {
           if (caught != null) {
             throw caught;
           } else {
-            throw new ISE("Task shutdown timed out (limit = %,dms)", 
SHUTDOWN_TIMEOUT_MILLIS);
+            throw new ISE("Task shutdown timed out (limit = %,dms)", 
config.shutdownTimeoutMillis);
           }
         }
 
@@ -754,10 +762,10 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
         taskTrackers.values().stream().mapToLong(tracker -> 
tracker.startTimeMillis).max();
 
     if (maxTaskStartTime.isPresent() &&
-        System.currentTimeMillis() - maxTaskStartTime.getAsLong() < 
SWITCH_TO_LOW_FREQUENCY_CHECK_AFTER_MILLIS) {
-      return HIGH_FREQUENCY_CHECK_MILLIS - loopDurationMillis;
+        System.currentTimeMillis() - maxTaskStartTime.getAsLong() < 
config.switchToLowFrequencyCheckAfterMillis) {
+      return config.highFrequencyCheckMillis - loopDurationMillis;
     } else {
-      return LOW_FREQUENCY_CHECK_MILLIS - loopDurationMillis;
+      return config.lowFrequencyCheckMillis - loopDurationMillis;
     }
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index eece9da5810..24df8823346 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.msq.indexing.MSQWorkerTask;
 import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
+import 
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
 import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
 import org.apache.druid.msq.indexing.error.MSQException;
@@ -231,7 +232,8 @@ public class MSQTasksTest
         new TasksTestOverlordClient(numSlots),
         (task, fault) -> {},
         ImmutableMap.of(),
-        TimeUnit.SECONDS.toMillis(5)
+        TimeUnit.SECONDS.toMillis(5),
+        new MSQWorkerTaskLauncherConfig()
     );
 
     try {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
index 25ab33f76f9..1e00be629a1 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.indexing;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import 
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,7 +44,8 @@ public class MSQWorkerTaskLauncherTest
         Mockito.mock(OverlordClient.class),
         (task, fault) -> {},
         ImmutableMap.of(),
-        TimeUnit.SECONDS.toMillis(5)
+        TimeUnit.SECONDS.toMillis(5),
+        new MSQWorkerTaskLauncherConfig()
     );
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 8aa2790a181..6a94124e768 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -20,9 +20,13 @@
 package org.apache.druid.msq.test;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Binder;
+import com.google.inject.Inject;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.TypeLiteral;
@@ -44,6 +48,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.exec.DataServerQueryHandler;
 import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
 import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
@@ -186,10 +191,39 @@ public class CalciteMSQTestsHelper
     }
 
     @Provides
-    public DataSegmentProvider provideDataSegmentProvider(TempDirProducer 
tempDirProducer)
+    @LazySingleton
+    public DataSegmentProvider 
provideDataSegmentProvider(LocalDataSegmentProvider localDataSegmentProvider)
+    {
+      return localDataSegmentProvider;
+    }
+
+    @LazySingleton
+    static class LocalDataSegmentProvider extends CacheLoader<SegmentId, 
CompleteSegment> implements DataSegmentProvider
     {
-      return (segmentId, channelCounters,
-          isReindex) -> getSupplierForSegment(tempDirProducer::newTempFolder, 
segmentId);
+      private TempDirProducer tempDirProducer;
+      private LoadingCache<SegmentId, CompleteSegment> cache;
+
+      @Inject
+      public LocalDataSegmentProvider(TempDirProducer tempDirProducer)
+      {
+        this.tempDirProducer = tempDirProducer;
+        this.cache = CacheBuilder.newBuilder().build(this);
+      }
+
+      @Override
+      public CompleteSegment load(SegmentId segmentId) throws Exception
+      {
+        return getSupplierForSegment(tempDirProducer::newTempFolder, 
segmentId);
+      }
+
+      @Override
+      public Supplier<ResourceHolder<CompleteSegment>> fetchSegment(SegmentId 
segmentId,
+          ChannelCounters channelCounters, boolean isReindex)
+      {
+        CompleteSegment a = cache.getUnchecked(segmentId);
+        return () -> new ReferenceCountingResourceHolder<>(a, Closer.create());
+      }
+
     }
 
     @Provides
@@ -239,7 +273,7 @@ public class CalciteMSQTestsHelper
     return mockFactory;
   }
 
-  protected static Supplier<ResourceHolder<CompleteSegment>> 
getSupplierForSegment(
+  protected static CompleteSegment getSupplierForSegment(
       Function<String, File> tempFolderProducer,
       SegmentId segmentId
   )
@@ -496,6 +530,6 @@ public class CalciteMSQTestsHelper
                                          .shardSpec(new LinearShardSpec(0))
                                          .size(0)
                                          .build();
-    return () -> new ReferenceCountingResourceHolder<>(new 
CompleteSegment(dataSegment, segment), Closer.create());
+    return new CompleteSegment(dataSegment, segment);
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 86051bc4b38..6e632a4869c 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -59,6 +59,7 @@ import 
org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer;
 import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.indexing.MSQWorkerTask;
 import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
+import 
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
 import org.apache.druid.msq.input.InputSpecSlicer;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.msq.querykit.QueryKit;
@@ -90,7 +91,7 @@ public class MSQTestControllerContext implements 
ControllerContext
   private final TaskActionClient taskActionClient;
   private final Map<String, Worker> inMemoryWorkers = new HashMap<>();
   private final ConcurrentMap<String, TaskStatus> statusMap = new 
ConcurrentHashMap<>();
-  private final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(
+  private static final ListeningExecutorService EXECUTOR = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(
       NUM_WORKERS,
       "MultiStageQuery-test-controller-client"
   ));
@@ -177,7 +178,7 @@ public class MSQTestControllerContext implements 
ControllerContext
       inMemoryWorkers.put(task.getId(), worker);
       statusMap.put(task.getId(), TaskStatus.running(task.getId()));
 
-      ListenableFuture<?> future = executor.submit(() -> {
+      ListenableFuture<?> future = EXECUTOR.submit(() -> {
         try {
           worker.run();
         }
@@ -353,13 +354,19 @@ public class MSQTestControllerContext implements 
ControllerContext
       WorkerFailureListener workerFailureListener
   )
   {
+    MSQWorkerTaskLauncherConfig taskLauncherConfig = new 
MSQWorkerTaskLauncherConfig();
+    taskLauncherConfig.highFrequencyCheckMillis = 1;
+    taskLauncherConfig.switchToLowFrequencyCheckAfterMillis = 25;
+    taskLauncherConfig.lowFrequencyCheckMillis = 2;
+
     return new MSQWorkerTaskLauncher(
         controller.queryId(),
         "test-datasource",
         overlordClient,
         workerFailureListener,
         IndexerControllerContext.makeTaskContext(querySpec, queryKernelConfig, 
ImmutableMap.of()),
-        0
+        0,
+        taskLauncherConfig
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to