This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2fde1f5f970 test: improve CI stability (#19207)
2fde1f5f970 is described below
commit 2fde1f5f970856ae7fc0120cdf64e10382563761
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Thu Mar 26 07:18:56 2026 +0100
test: improve CI stability (#19207)
---
.../embedded/minio/MinIOStorageResource.java | 1 +
.../org/apache/druid/msq/exec/ControllerImpl.java | 3 +-
.../druid/msq/test/CalciteMSQTestsHelper.java | 8 +++++
.../druid/msq/test/MSQTestControllerContext.java | 40 ++++++++++++++--------
.../msq/test/MSQTestOverlordServiceClient.java | 5 ++-
.../druid/msq/test/MSQTestWorkerContext.java | 12 ++++++-
6 files changed, 52 insertions(+), 17 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java
index 7e85bad0d6a..804e78c646e 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java
@@ -101,6 +101,7 @@ public class MinIOStorageResource extends
TestcontainerResource<MinIOContainer>
cluster.addCommonProperty("druid.s3.secretKey", getSecretKey());
cluster.addCommonProperty("druid.s3.enablePathStyleAccess", "true");
cluster.addCommonProperty("druid.s3.protocol", "http");
+ cluster.addCommonProperty("druid.s3.maxConnections", "150");
}
public String getBucket()
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 86223a20ac2..83db73bd754 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -766,8 +766,9 @@ public class ControllerImpl implements Controller
}
final long maxParseExceptions =
MultiStageQueryContext.getMaxParseExceptions(queryContext);
+ // When maxParseExceptions == 0, workers post CannotParseExternalDataFault
directly via criticalWarningCodes.
this.faultsExceededChecker = new FaultsExceededChecker(
- ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions)
+ ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions
== 0 ? -1 : maxParseExceptions)
);
stageToStatsMergingMode = new HashMap<>();
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 586b17b0563..bef94919dcf 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -32,6 +32,7 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.JoinableFactoryModule;
+import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.initialization.DruidModule;
@@ -90,6 +91,7 @@ public class CalciteMSQTestsHelper
}
@Provides
+ @LazySingleton
public SegmentCacheManager provideSegmentCacheManager(ObjectMapper
testMapper, TempDirProducer tempDirProducer)
{
return new SegmentCacheManagerFactory(TestIndex.INDEX_IO, testMapper)
@@ -97,6 +99,7 @@ public class CalciteMSQTestsHelper
}
@Provides
+ @LazySingleton
public LocalDataSegmentPusherConfig
provideLocalDataSegmentPusherConfig(TempDirProducer tempDirProducer)
{
LocalDataSegmentPusherConfig config = new LocalDataSegmentPusherConfig();
@@ -105,12 +108,14 @@ public class CalciteMSQTestsHelper
}
@Provides
+ @LazySingleton
public TestSegmentManager provideTestSegmentManager()
{
return new TestSegmentManager();
}
@Provides
+ @LazySingleton
public DataSegmentPusher provideDataSegmentPusher(
LocalDataSegmentPusherConfig config,
TestSegmentManager testSegmentManager
@@ -120,12 +125,14 @@ public class CalciteMSQTestsHelper
}
@Provides
+ @LazySingleton
public DataSegmentAnnouncer provideDataSegmentAnnouncer()
{
return new NoopDataSegmentAnnouncer();
}
@Provides
+ @LazySingleton
public SegmentManager provideSegmentManager(
TestSegmentManager testSegmentManager,
SpecificSegmentsQuerySegmentWalker walker
@@ -143,6 +150,7 @@ public class CalciteMSQTestsHelper
}
@Provides
+ @LazySingleton
public DataServerQueryHandlerFactory provideDataServerQueryHandlerFactory()
{
return getTestDataServerQueryHandlerFactory();
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 758fc644adb..e7dda29cc55 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -80,6 +80,7 @@ import org.mockito.Mockito;
import javax.annotation.Nullable;
import java.io.File;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -100,7 +101,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
private final ConcurrentMap<String, TaskStatus> statusMap = new
ConcurrentHashMap<>();
public static final ExecutorService POOL = Execs.multiThreaded(NUM_WORKERS,
"MSQTestControllerContext-worker-%d");
public static final ListeningExecutorService EXECUTOR =
MoreExecutors.listeningDecorator(POOL);
- private final File tempDir = FileUtils.createTempDir();
+ private final File tempDir;
private final CoordinatorClient coordinatorClient;
private final DruidNode node = new DruidNode(
"controller",
@@ -134,6 +135,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
)
{
this.queryId = queryId;
+ this.tempDir = FileUtils.createTempDir("msq-controller-" + queryId);
this.mapper = mapper;
this.injector = injector;
this.taskActionClient = taskActionClient;
@@ -250,20 +252,18 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
workerStorageParameters =
WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE);
}
- Worker worker = new WorkerImpl(
- task,
- new MSQTestWorkerContext(
- task.getId(),
- inMemoryWorkers,
- controller,
- mapper,
- injector,
- workerMemoryParameters,
- workerStorageParameters,
- serviceEmitter,
- coordinatorClient
- )
+ final MSQTestWorkerContext workerContext = new MSQTestWorkerContext(
+ task.getId(),
+ inMemoryWorkers,
+ controller,
+ mapper,
+ injector,
+ workerMemoryParameters,
+ workerStorageParameters,
+ serviceEmitter,
+ coordinatorClient
);
+ Worker worker = new WorkerImpl(task, workerContext);
final WorkerRunRef workerRunRef = new WorkerRunRef();
inMemoryWorkers.put(task.getId(), workerRunRef);
ListenableFuture<?> future = workerRunRef.run(worker, EXECUTOR);
@@ -275,6 +275,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
public void onSuccess(@Nullable Object result)
{
statusMap.put(task.getId(), TaskStatus.success(task.getId()));
+ workerContext.close();
}
@Override
@@ -282,6 +283,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
{
log.error(t, "error running worker task %s", task.getId());
statusMap.put(task.getId(), TaskStatus.failure(task.getId(),
t.getMessage()));
+ workerContext.close();
}
}, MoreExecutors.directExecutor());
@@ -476,4 +478,14 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
this.queryContext = this.queryContext.override(context);
return this;
}
+
+ public void close()
+ {
+ try {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ catch (IOException e) {
+ log.warn(e, "Failed to delete temp dir[%s] for controller[%s]", tempDir,
queryId);
+ }
+ }
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
index 658139289db..c93e813ba5e 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
@@ -150,7 +150,7 @@ public class MSQTestOverlordServiceClient extends
NoopOverlordClient
{
TestQueryListener queryListener = null;
ControllerImpl controller = null;
- MSQTestControllerContext msqTestControllerContext;
+ MSQTestControllerContext msqTestControllerContext = null;
MSQTestTaskDetails testTaskDetails = registerTestTask(taskId);
try {
MSQControllerTask cTask = objectMapper.convertValue(taskObject,
MSQControllerTask.class);
@@ -205,6 +205,9 @@ public class MSQTestOverlordServiceClient extends
NoopOverlordClient
if (queryListener != null && queryListener.reportMap != null) {
testTaskDetails.report = queryListener.getReportMap();
}
+ if (msqTestControllerContext != null) {
+ msqTestControllerContext.close();
+ }
}
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index c57df8bcfe5..fc08fd122eb 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -27,6 +27,7 @@ import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerClient;
@@ -59,11 +60,13 @@ import org.apache.druid.server.SegmentManager;
import javax.annotation.Nullable;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
public class MSQTestWorkerContext implements WorkerContext
{
+ private static final Logger log = new Logger(MSQTestWorkerContext.class);
private static final StupidPool<ByteBuffer> BUFFER_POOL = new
StupidPool<>("testProcessing", () -> ByteBuffer.allocate(1_000_000));
private final String workerId;
@@ -71,7 +74,7 @@ public class MSQTestWorkerContext implements WorkerContext
private final ObjectMapper mapper;
private final Injector injector;
private final Map<String, WorkerRunRef> inMemoryWorkers;
- private final File file = FileUtils.createTempDir();
+ private final File file;
private final WorkerMemoryParameters workerMemoryParameters;
private final WorkerStorageParameters workerStorageParameters;
private final ServiceEmitter serviceEmitter;
@@ -91,6 +94,7 @@ public class MSQTestWorkerContext implements WorkerContext
)
{
this.workerId = workerId;
+ this.file = FileUtils.createTempDir("msq-worker-" + workerId);
this.inMemoryWorkers = inMemoryWorkers;
this.controller = controller;
this.mapper = mapper;
@@ -205,6 +209,12 @@ public class MSQTestWorkerContext implements WorkerContext
@Override
public void close()
{
+ try {
+ FileUtils.deleteDirectory(file);
+ }
+ catch (IOException e) {
+ log.warn(e, "Failed to delete temp dir[%s] for worker[%s]", file,
workerId);
+ }
}
class FrameContextImpl implements FrameContext
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]