This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fde1f5f970 test: improve CI stability (#19207)
2fde1f5f970 is described below

commit 2fde1f5f970856ae7fc0120cdf64e10382563761
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Thu Mar 26 07:18:56 2026 +0100

    test: improve CI stability (#19207)
---
 .../embedded/minio/MinIOStorageResource.java       |  1 +
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  3 +-
 .../druid/msq/test/CalciteMSQTestsHelper.java      |  8 +++++
 .../druid/msq/test/MSQTestControllerContext.java   | 40 ++++++++++++++--------
 .../msq/test/MSQTestOverlordServiceClient.java     |  5 ++-
 .../druid/msq/test/MSQTestWorkerContext.java       | 12 ++++++-
 6 files changed, 52 insertions(+), 17 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java
index 7e85bad0d6a..804e78c646e 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/MinIOStorageResource.java
@@ -101,6 +101,7 @@ public class MinIOStorageResource extends 
TestcontainerResource<MinIOContainer>
     cluster.addCommonProperty("druid.s3.secretKey", getSecretKey());
     cluster.addCommonProperty("druid.s3.enablePathStyleAccess", "true");
     cluster.addCommonProperty("druid.s3.protocol", "http");
+    cluster.addCommonProperty("druid.s3.maxConnections", "150");
   }
 
   public String getBucket()
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 86223a20ac2..83db73bd754 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -766,8 +766,9 @@ public class ControllerImpl implements Controller
     }
 
     final long maxParseExceptions = 
MultiStageQueryContext.getMaxParseExceptions(queryContext);
+    // When maxParseExceptions == 0, workers post CannotParseExternalDataFault 
directly via criticalWarningCodes.
     this.faultsExceededChecker = new FaultsExceededChecker(
-        ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions)
+        ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions 
== 0 ? -1 : maxParseExceptions)
     );
 
     stageToStatsMergingMode = new HashMap<>();
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 586b17b0563..bef94919dcf 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -32,6 +32,7 @@ import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.frame.processor.Bouncer;
 import org.apache.druid.guice.IndexingServiceTuningConfigModule;
 import org.apache.druid.guice.JoinableFactoryModule;
+import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.initialization.DruidModule;
@@ -90,6 +91,7 @@ public class CalciteMSQTestsHelper
     }
 
     @Provides
+    @LazySingleton
     public SegmentCacheManager provideSegmentCacheManager(ObjectMapper 
testMapper, TempDirProducer tempDirProducer)
     {
       return new SegmentCacheManagerFactory(TestIndex.INDEX_IO, testMapper)
@@ -97,6 +99,7 @@ public class CalciteMSQTestsHelper
     }
 
     @Provides
+    @LazySingleton
     public LocalDataSegmentPusherConfig 
provideLocalDataSegmentPusherConfig(TempDirProducer tempDirProducer)
     {
       LocalDataSegmentPusherConfig config = new LocalDataSegmentPusherConfig();
@@ -105,12 +108,14 @@ public class CalciteMSQTestsHelper
     }
 
     @Provides
+    @LazySingleton
     public TestSegmentManager provideTestSegmentManager()
     {
       return new TestSegmentManager();
     }
 
     @Provides
+    @LazySingleton
     public DataSegmentPusher provideDataSegmentPusher(
         LocalDataSegmentPusherConfig config,
         TestSegmentManager testSegmentManager
@@ -120,12 +125,14 @@ public class CalciteMSQTestsHelper
     }
 
     @Provides
+    @LazySingleton
     public DataSegmentAnnouncer provideDataSegmentAnnouncer()
     {
       return new NoopDataSegmentAnnouncer();
     }
 
     @Provides
+    @LazySingleton
     public SegmentManager provideSegmentManager(
         TestSegmentManager testSegmentManager,
         SpecificSegmentsQuerySegmentWalker walker
@@ -143,6 +150,7 @@ public class CalciteMSQTestsHelper
     }
 
     @Provides
+    @LazySingleton
     public DataServerQueryHandlerFactory provideDataServerQueryHandlerFactory()
     {
       return getTestDataServerQueryHandlerFactory();
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 758fc644adb..e7dda29cc55 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -80,6 +80,7 @@ import org.mockito.Mockito;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -100,7 +101,7 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
   private final ConcurrentMap<String, TaskStatus> statusMap = new 
ConcurrentHashMap<>();
   public static final ExecutorService POOL = Execs.multiThreaded(NUM_WORKERS, 
"MSQTestControllerContext-worker-%d");
   public static final ListeningExecutorService EXECUTOR = 
MoreExecutors.listeningDecorator(POOL);
-  private final File tempDir = FileUtils.createTempDir();
+  private final File tempDir;
   private final CoordinatorClient coordinatorClient;
   private final DruidNode node = new DruidNode(
       "controller",
@@ -134,6 +135,7 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
   )
   {
     this.queryId = queryId;
+    this.tempDir = FileUtils.createTempDir("msq-controller-" + queryId);
     this.mapper = mapper;
     this.injector = injector;
     this.taskActionClient = taskActionClient;
@@ -250,20 +252,18 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
         workerStorageParameters = 
WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE);
       }
 
-      Worker worker = new WorkerImpl(
-          task,
-          new MSQTestWorkerContext(
-              task.getId(),
-              inMemoryWorkers,
-              controller,
-              mapper,
-              injector,
-              workerMemoryParameters,
-              workerStorageParameters,
-              serviceEmitter,
-              coordinatorClient
-          )
+      final MSQTestWorkerContext workerContext = new MSQTestWorkerContext(
+          task.getId(),
+          inMemoryWorkers,
+          controller,
+          mapper,
+          injector,
+          workerMemoryParameters,
+          workerStorageParameters,
+          serviceEmitter,
+          coordinatorClient
       );
+      Worker worker = new WorkerImpl(task, workerContext);
       final WorkerRunRef workerRunRef = new WorkerRunRef();
       inMemoryWorkers.put(task.getId(), workerRunRef);
       ListenableFuture<?> future = workerRunRef.run(worker, EXECUTOR);
@@ -275,6 +275,7 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
         public void onSuccess(@Nullable Object result)
         {
           statusMap.put(task.getId(), TaskStatus.success(task.getId()));
+          workerContext.close();
         }
 
         @Override
@@ -282,6 +283,7 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
         {
           log.error(t, "error running worker task %s", task.getId());
           statusMap.put(task.getId(), TaskStatus.failure(task.getId(), 
t.getMessage()));
+          workerContext.close();
         }
       }, MoreExecutors.directExecutor());
 
@@ -476,4 +478,14 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
     this.queryContext = this.queryContext.override(context);
     return this;
   }
+
+  public void close()
+  {
+    try {
+      FileUtils.deleteDirectory(tempDir);
+    }
+    catch (IOException e) {
+      log.warn(e, "Failed to delete temp dir[%s] for controller[%s]", tempDir, 
queryId);
+    }
+  }
 }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
index 658139289db..c93e813ba5e 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
@@ -150,7 +150,7 @@ public class MSQTestOverlordServiceClient extends 
NoopOverlordClient
   {
     TestQueryListener queryListener = null;
     ControllerImpl controller = null;
-    MSQTestControllerContext msqTestControllerContext;
+    MSQTestControllerContext msqTestControllerContext = null;
     MSQTestTaskDetails testTaskDetails = registerTestTask(taskId);
     try {
       MSQControllerTask cTask = objectMapper.convertValue(taskObject, 
MSQControllerTask.class);
@@ -205,6 +205,9 @@ public class MSQTestOverlordServiceClient extends 
NoopOverlordClient
       if (queryListener != null && queryListener.reportMap != null) {
         testTaskDetails.report = queryListener.getReportMap();
       }
+      if (msqTestControllerContext != null) {
+        msqTestControllerContext.close();
+      }
     }
   }
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index c57df8bcfe5..fc08fd122eb 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -27,6 +27,7 @@ import org.apache.druid.frame.FrameType;
 import org.apache.druid.frame.processor.Bouncer;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.ControllerClient;
@@ -59,11 +60,13 @@ import org.apache.druid.server.SegmentManager;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
 public class MSQTestWorkerContext implements WorkerContext
 {
+  private static final Logger log = new Logger(MSQTestWorkerContext.class);
   private static final StupidPool<ByteBuffer> BUFFER_POOL = new 
StupidPool<>("testProcessing", () -> ByteBuffer.allocate(1_000_000));
 
   private final String workerId;
@@ -71,7 +74,7 @@ public class MSQTestWorkerContext implements WorkerContext
   private final ObjectMapper mapper;
   private final Injector injector;
   private final Map<String, WorkerRunRef> inMemoryWorkers;
-  private final File file = FileUtils.createTempDir();
+  private final File file;
   private final WorkerMemoryParameters workerMemoryParameters;
   private final WorkerStorageParameters workerStorageParameters;
   private final ServiceEmitter serviceEmitter;
@@ -91,6 +94,7 @@ public class MSQTestWorkerContext implements WorkerContext
   )
   {
     this.workerId = workerId;
+    this.file = FileUtils.createTempDir("msq-worker-" + workerId);
     this.inMemoryWorkers = inMemoryWorkers;
     this.controller = controller;
     this.mapper = mapper;
@@ -205,6 +209,12 @@ public class MSQTestWorkerContext implements WorkerContext
   @Override
   public void close()
   {
+    try {
+      FileUtils.deleteDirectory(file);
+    }
+    catch (IOException e) {
+      log.warn(e, "Failed to delete temp dir[%s] for worker[%s]", file, 
workerId);
+    }
   }
 
   class FrameContextImpl implements FrameContext


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to