This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 512b70c4687 Make MSQ tests run faster (#17718)
512b70c4687 is described below
commit 512b70c4687fd0da1845c98693b8024bf829992a
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Thu Feb 13 08:35:21 2025 +0100
Make MSQ tests run faster (#17718)
---
.../msq/indexing/IndexerControllerContext.java | 4 +-
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 28 +++++++++-----
.../org/apache/druid/msq/exec/MSQTasksTest.java | 4 +-
.../msq/indexing/MSQWorkerTaskLauncherTest.java | 4 +-
.../druid/msq/test/CalciteMSQTestsHelper.java | 44 +++++++++++++++++++---
.../druid/msq/test/MSQTestControllerContext.java | 13 +++++--
6 files changed, 76 insertions(+), 21 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index a4d7778a841..e74ec7f2a66 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -41,6 +41,7 @@ import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.guice.MultiStageQuery;
+import
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.client.IndexerWorkerClient;
import org.apache.druid.msq.indexing.error.MSQException;
@@ -216,7 +217,8 @@ public class IndexerControllerContext implements
ControllerContext
workerFailureListener,
makeTaskContext(querySpec, queryKernelConfig, task.getContext()),
// 10 minutes +- 2 minutes jitter
- TimeUnit.SECONDS.toMillis(600 +
ThreadLocalRandom.current().nextInt(-4, 5) * 30L)
+ TimeUnit.SECONDS.toMillis(600 +
ThreadLocalRandom.current().nextInt(-4, 5) * 30L),
+ new MSQWorkerTaskLauncherConfig()
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index ed32b81f44e..5d19c899b10 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -72,12 +72,17 @@ import java.util.stream.Collectors;
public class MSQWorkerTaskLauncher implements RetryCapableWorkerManager
{
private static final Logger log = new Logger(MSQWorkerTaskLauncher.class);
- private static final long HIGH_FREQUENCY_CHECK_MILLIS = 100;
- private static final long LOW_FREQUENCY_CHECK_MILLIS = 2000;
- private static final long SWITCH_TO_LOW_FREQUENCY_CHECK_AFTER_MILLIS = 10000;
- private static final long SHUTDOWN_TIMEOUT_MILLIS =
Duration.ofMinutes(1).toMillis();
+
private int currentRelaunchCount = 0;
+ public static class MSQWorkerTaskLauncherConfig
+ {
+ public long highFrequencyCheckMillis = 100;
+ public long lowFrequencyCheckMillis = 2000;
+ public long switchToLowFrequencyCheckAfterMillis = 10000;
+ public long shutdownTimeoutMillis = Duration.ofMinutes(1).toMillis();
+ }
+
// States for "state" variable.
private enum State
{
@@ -91,6 +96,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
private final OverlordClient overlordClient;
private final ExecutorService exec;
private final long maxTaskStartDelayMillis;
+ private final MSQWorkerTaskLauncherConfig config;
// Mutable state meant to be accessible by threads outside the main loop.
private final SettableFuture<?> stopFuture = SettableFuture.create();
@@ -150,7 +156,8 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
final OverlordClient overlordClient,
final WorkerFailureListener workerFailureListener,
final Map<String, Object> taskContextOverrides,
- final long maxTaskStartDelayMillis
+ final long maxTaskStartDelayMillis,
+ final MSQWorkerTaskLauncherConfig config
)
{
this.controllerTaskId = controllerTaskId;
@@ -162,6 +169,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
);
this.workerFailureListener = workerFailureListener;
this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
+ this.config = config;
}
@Override
@@ -396,11 +404,11 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
// Sleep for a bit, maybe.
final long now = System.currentTimeMillis();
- if (now > stopStartTime + SHUTDOWN_TIMEOUT_MILLIS) {
+ if (now > stopStartTime + config.shutdownTimeoutMillis) {
if (caught != null) {
throw caught;
} else {
- throw new ISE("Task shutdown timed out (limit = %,dms)",
SHUTDOWN_TIMEOUT_MILLIS);
+ throw new ISE("Task shutdown timed out (limit = %,dms)",
config.shutdownTimeoutMillis);
}
}
@@ -754,10 +762,10 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
taskTrackers.values().stream().mapToLong(tracker ->
tracker.startTimeMillis).max();
if (maxTaskStartTime.isPresent() &&
- System.currentTimeMillis() - maxTaskStartTime.getAsLong() <
SWITCH_TO_LOW_FREQUENCY_CHECK_AFTER_MILLIS) {
- return HIGH_FREQUENCY_CHECK_MILLIS - loopDurationMillis;
+ System.currentTimeMillis() - maxTaskStartTime.getAsLong() <
config.switchToLowFrequencyCheckAfterMillis) {
+ return config.highFrequencyCheckMillis - loopDurationMillis;
} else {
- return LOW_FREQUENCY_CHECK_MILLIS - loopDurationMillis;
+ return config.lowFrequencyCheckMillis - loopDurationMillis;
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index eece9da5810..24df8823346 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
+import
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQException;
@@ -231,7 +232,8 @@ public class MSQTasksTest
new TasksTestOverlordClient(numSlots),
(task, fault) -> {},
ImmutableMap.of(),
- TimeUnit.SECONDS.toMillis(5)
+ TimeUnit.SECONDS.toMillis(5),
+ new MSQWorkerTaskLauncherConfig()
);
try {
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
index 25ab33f76f9..1e00be629a1 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.junit.Assert;
import org.junit.Before;
@@ -43,7 +44,8 @@ public class MSQWorkerTaskLauncherTest
Mockito.mock(OverlordClient.class),
(task, fault) -> {},
ImmutableMap.of(),
- TimeUnit.SECONDS.toMillis(5)
+ TimeUnit.SECONDS.toMillis(5),
+ new MSQWorkerTaskLauncherConfig()
);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 8aa2790a181..6a94124e768 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -20,9 +20,13 @@
package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
+import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
@@ -44,6 +48,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
@@ -186,10 +191,39 @@ public class CalciteMSQTestsHelper
}
@Provides
- public DataSegmentProvider provideDataSegmentProvider(TempDirProducer
tempDirProducer)
+ @LazySingleton
+ public DataSegmentProvider
provideDataSegmentProvider(LocalDataSegmentProvider localDataSegmentProvider)
+ {
+ return localDataSegmentProvider;
+ }
+
+ @LazySingleton
+ static class LocalDataSegmentProvider extends CacheLoader<SegmentId,
CompleteSegment> implements DataSegmentProvider
{
- return (segmentId, channelCounters,
- isReindex) -> getSupplierForSegment(tempDirProducer::newTempFolder,
segmentId);
+ private TempDirProducer tempDirProducer;
+ private LoadingCache<SegmentId, CompleteSegment> cache;
+
+ @Inject
+ public LocalDataSegmentProvider(TempDirProducer tempDirProducer)
+ {
+ this.tempDirProducer = tempDirProducer;
+ this.cache = CacheBuilder.newBuilder().build(this);
+ }
+
+ @Override
+ public CompleteSegment load(SegmentId segmentId) throws Exception
+ {
+ return getSupplierForSegment(tempDirProducer::newTempFolder,
segmentId);
+ }
+
+ @Override
+ public Supplier<ResourceHolder<CompleteSegment>> fetchSegment(SegmentId
segmentId,
+ ChannelCounters channelCounters, boolean isReindex)
+ {
+ CompleteSegment a = cache.getUnchecked(segmentId);
+ return () -> new ReferenceCountingResourceHolder<>(a, Closer.create());
+ }
+
}
@Provides
@@ -239,7 +273,7 @@ public class CalciteMSQTestsHelper
return mockFactory;
}
- protected static Supplier<ResourceHolder<CompleteSegment>>
getSupplierForSegment(
+ protected static CompleteSegment getSupplierForSegment(
Function<String, File> tempFolderProducer,
SegmentId segmentId
)
@@ -496,6 +530,6 @@ public class CalciteMSQTestsHelper
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
- return () -> new ReferenceCountingResourceHolder<>(new
CompleteSegment(dataSegment, segment), Closer.create());
+ return new CompleteSegment(dataSegment, segment);
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 86051bc4b38..6e632a4869c 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -59,6 +59,7 @@ import
org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
+import
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit;
@@ -90,7 +91,7 @@ public class MSQTestControllerContext implements
ControllerContext
private final TaskActionClient taskActionClient;
private final Map<String, Worker> inMemoryWorkers = new HashMap<>();
private final ConcurrentMap<String, TaskStatus> statusMap = new
ConcurrentHashMap<>();
- private final ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Execs.multiThreaded(
+ private static final ListeningExecutorService EXECUTOR =
MoreExecutors.listeningDecorator(Execs.multiThreaded(
NUM_WORKERS,
"MultiStageQuery-test-controller-client"
));
@@ -177,7 +178,7 @@ public class MSQTestControllerContext implements
ControllerContext
inMemoryWorkers.put(task.getId(), worker);
statusMap.put(task.getId(), TaskStatus.running(task.getId()));
- ListenableFuture<?> future = executor.submit(() -> {
+ ListenableFuture<?> future = EXECUTOR.submit(() -> {
try {
worker.run();
}
@@ -353,13 +354,19 @@ public class MSQTestControllerContext implements
ControllerContext
WorkerFailureListener workerFailureListener
)
{
+ MSQWorkerTaskLauncherConfig taskLauncherConfig = new
MSQWorkerTaskLauncherConfig();
+ taskLauncherConfig.highFrequencyCheckMillis = 1;
+ taskLauncherConfig.switchToLowFrequencyCheckAfterMillis = 25;
+ taskLauncherConfig.lowFrequencyCheckMillis = 2;
+
return new MSQWorkerTaskLauncher(
controller.queryId(),
"test-datasource",
overlordClient,
workerFailureListener,
IndexerControllerContext.makeTaskContext(querySpec, queryKernelConfig,
ImmutableMap.of()),
- 0
+ 0,
+ taskLauncherConfig
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]