This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 201de30eb12 Fix concurrency issues with StubServiceEmitter (#18249)
201de30eb12 is described below

commit 201de30eb121f4394dfdd6bd38f02784924f24da
Author: Zoltan Haindrich <k...@rxd.hu>
AuthorDate: Tue Jul 15 09:20:33 2025 +0200

    Fix concurrency issues with StubServiceEmitter (#18249)
    
    * concurrency issue with StubServiceEmitter - #18121 have added a few new 
metrics which have increased the load on it and have caused  random appearances 
of an issue arising from the fact it used an `ArrayList` under the hood.
    * added some catches to shut down queries properly in case some unexpected 
exceptions occur - this could give better exceptions and reduce time to fix in 
the future
    
    this should reduce the probability that `QTest` splits remain hanging
---
 .../msq/dart/controller/sql/DartQueryMaker.java     |  9 +++++++++
 .../org/apache/druid/msq/exec/ControllerImpl.java   | 13 +++++++++++++
 .../overlord/duty/UnusedSegmentsKillerTest.java     |  3 ++-
 pom.xml                                             |  2 +-
 .../util/metrics/HttpPostEmitterMonitorTest.java    |  8 ++++----
 .../druid/java/util/metrics/StubServiceEmitter.java | 21 ++++++++++++---------
 .../appenderator/StreamAppenderatorTest.java        |  5 +++--
 .../druid/server/audit/SQLAuditManagerTest.java     | 13 +++++++------
 .../druid/server/metrics/LatchableEmitter.java      |  5 +++--
 9 files changed, 54 insertions(+), 25 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index a6583db66fc..60563d6daf4 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -405,6 +405,15 @@ public class DartQueryMaker implements QueryMaker
               controller.queryId()
           );
         }
+        catch (Throwable e) {
+          log.error(
+              e,
+              "Controller failed for sqlQueryId[%s], controllerHost[%s]",
+              plannerContext.getSqlQueryId(),
+              controller.queryId()
+          );
+          throw e;
+        }
         finally {
           controllerRegistry.deregister(controllerHolder);
           Thread.currentThread().setName(threadName);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 9c3d94094cb..a68a58ae476 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -343,10 +343,23 @@ public class ControllerImpl implements Controller
     try (final Closer closer = Closer.create()) {
       reportPayload = runInternal(queryListener, closer);
     }
+    catch (Throwable e) {
+      log.error(e, "Controller internal execution encountered exception.");
+      queryListener.onQueryComplete(makeStatusReportForException(e));
+      throw e;
+    }
     // Call onQueryComplete after Closer is fully closed, ensuring no 
controller-related processing is ongoing.
     queryListener.onQueryComplete(reportPayload);
   }
 
+
+  private MSQTaskReportPayload makeStatusReportForException(Throwable e)
+  {
+    MSQErrorReport errorReport = MSQErrorReport.fromFault(queryId(), null, 
null, UnknownFault.forException(e));
+    MSQStatusReport statusReport = new MSQStatusReport(TaskState.FAILED, 
errorReport, null, null, 0, new HashMap<>(), 0, 0, null, null);
+    return new MSQTaskReportPayload(statusReport, null, null, null);
+  }
+
   @Override
   public void stop(CancellationReason reason)
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
index 1130eefa8d7..111489a74dc 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
@@ -52,6 +52,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -328,7 +329,7 @@ public class UnusedSegmentsKillerTest
     emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
 
     // Verify that the kill intervals are sorted with the oldest interval first
-    final List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
+    final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
         emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
     final List<Interval> killIntervals = events.stream().map(event -> {
       final String taskId = (String) 
event.getUserDims().get(DruidMetrics.TASK_ID);
diff --git a/pom.xml b/pom.xml
index 082a9a0cb80..05bd09e6ba8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1094,7 +1094,7 @@
             <dependency>
                 <groupId>org.junit</groupId>
                 <artifactId>junit-bom</artifactId>
-                <version>5.10.2</version>
+                <version>5.13.3</version>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
index 601dc4a61b4..2be52c091e7 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
@@ -25,8 +25,8 @@ import 
org.apache.druid.java.util.emitter.core.HttpPostEmitter;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -65,7 +65,7 @@ public class HttpPostEmitterMonitorTest
 
     assertTrue(monitor.doMonitor(stubServiceEmitter));
 
-    final Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = stubServiceEmitter.getMetricEvents();
+    final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = stubServiceEmitter.getMetricEvents();
 
     assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0);
     assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L);
@@ -83,8 +83,8 @@ public class HttpPostEmitterMonitorTest
     assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L);
   }
 
-  private void assertMetricValue(Map<String, 
List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String 
metricName, Number expectedValue)
+  private void assertMetricValue(Map<String, 
Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String 
metricName, Number expectedValue)
   {
-    
assertEquals(metricEvents.get(metricName).get(0).getMetricEvent().getValue().doubleValue(),
 expectedValue.doubleValue());
+    
assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(),
 expectedValue.doubleValue());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 323d8cd308c..55113b97ac2 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -24,11 +24,14 @@ import 
org.apache.druid.java.util.emitter.service.AlertEvent;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 /**
  * Test implementation of {@link ServiceEmitter} that collects emitted metrics
@@ -36,9 +39,9 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class StubServiceEmitter extends ServiceEmitter implements 
MetricsVerifier
 {
-  private final List<Event> events = new ArrayList<>();
-  private final List<AlertEvent> alertEvents = new ArrayList<>();
-  private final ConcurrentHashMap<String, List<ServiceMetricEventSnapshot>> 
metricEvents = new ConcurrentHashMap<>();
+  private final Queue<Event> events = new ConcurrentLinkedDeque<>();
+  private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
+  private final ConcurrentHashMap<String, Queue<ServiceMetricEventSnapshot>> 
metricEvents = new ConcurrentHashMap<>();
 
   public StubServiceEmitter()
   {
@@ -55,7 +58,7 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
   {
     if (event instanceof ServiceMetricEvent) {
       ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
-      metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new 
ArrayList<>())
+      metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new 
ConcurrentLinkedDeque<>())
                   .add(new ServiceMetricEventSnapshot(metricEvent));
     } else if (event instanceof AlertEvent) {
       alertEvents.add((AlertEvent) event);
@@ -68,7 +71,7 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
    */
   public List<Event> getEvents()
   {
-    return events;
+    return new ArrayList<>(events);
   }
 
   /**
@@ -76,7 +79,7 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
    *
    * @return Map from metric name to list of events emitted for that metric.
    */
-  public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
+  public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
   {
     return metricEvents;
   }
@@ -86,7 +89,7 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
    */
   public List<AlertEvent> getAlerts()
   {
-    return alertEvents;
+    return new ArrayList<>(alertEvents);
   }
 
   @Override
@@ -96,8 +99,8 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
   )
   {
     final List<Number> values = new ArrayList<>();
-    final List<ServiceMetricEventSnapshot> events =
-        metricEvents.getOrDefault(metricName, Collections.emptyList());
+    final Queue<ServiceMetricEventSnapshot> events =
+        metricEvents.getOrDefault(metricName, new ArrayDeque<>());
     final Map<String, Object> filters =
         dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
     for (ServiceMetricEventSnapshot event : events) {
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index fee0d2ca757..463c4ed8c53 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -78,6 +78,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -162,7 +163,7 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
       ).get();
       Assert.assertEquals(
           ImmutableMap.of("x", "3"),
-          (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
+          segmentsAndCommitMetadata.getCommitMetadata()
       );
       Assert.assertEquals(
           IDENTIFIERS.subList(0, 2),
@@ -2278,7 +2279,7 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
 
   private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> 
segmentIds)
   {
-    Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> events = 
emitter.getMetricEvents();
+    Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> events = 
emitter.getMetricEvents();
     int segments = segmentIds.size();
     Assert.assertEquals(4, events.size());
     Assert.assertTrue(events.containsKey("query/cpu/time"));
diff --git 
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java 
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
index f10248bf1e9..3505eb943a2 100644
--- 
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
@@ -43,6 +43,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.TreeMap;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -91,14 +92,14 @@ public class SQLAuditManagerTest
     final AuditEntry entry = createAuditEntry("testKey", "testType", 
DateTimes.nowUtc());
     auditManager.doAudit(entry);
 
-    Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = serviceEmitter.getMetricEvents();
+    Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = serviceEmitter.getMetricEvents();
     Assert.assertEquals(1, metricEvents.size());
 
-    List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = 
metricEvents.get("config/audit");
+    Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = 
metricEvents.get("config/audit");
     Assert.assertNotNull(auditMetricEvents);
     Assert.assertEquals(1, auditMetricEvents.size());
 
-    ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
+    ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
 
     final AuditEntry dbEntry = lookupAuditEntryForKey("testKey");
     Assert.assertNotNull(dbEntry);
@@ -120,14 +121,14 @@ public class SQLAuditManagerTest
     Assert.assertEquals(entry, dbEntry);
 
     // Verify emitted metrics
-    Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = serviceEmitter.getMetricEvents();
+    Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> 
metricEvents = serviceEmitter.getMetricEvents();
     Assert.assertEquals(1, metricEvents.size());
 
-    List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = 
metricEvents.get("config/audit");
+    Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = 
metricEvents.get("config/audit");
     Assert.assertNotNull(auditMetricEvents);
     Assert.assertEquals(1, auditMetricEvents.size());
 
-    ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
+    ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
     Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key"));
     Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type"));
     Assert.assertNull(metric.getUserDims().get("payload"));
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java 
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index 87a5b612147..aeed3c40ea9 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -181,13 +181,14 @@ public class LatchableEmitter extends StubServiceEmitter
         return;
       }
 
+      List<Event> events = getEvents();
       for (WaitCondition condition : conditionsToEvaluate) {
-        final int currentNumberOfEvents = getEvents().size();
+        final int currentNumberOfEvents = events.size();
 
         // Do not use an iterator over the list to avoid concurrent 
modification exceptions
         // Evaluate new events against this condition
         for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) 
{
-          if (condition.predicate.test(getEvents().get(i))) {
+          if (condition.predicate.test(events.get(i))) {
             condition.countDownLatch.countDown();
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to