This is an automated email from the ASF dual-hosted git repository. kgyrtkirk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 201de30eb12 Fix concurrency issues with StubServiceEmitter (#18249) 201de30eb12 is described below commit 201de30eb121f4394dfdd6bd38f02784924f24da Author: Zoltan Haindrich <k...@rxd.hu> AuthorDate: Tue Jul 15 09:20:33 2025 +0200 Fix concurrency issues with StubServiceEmitter (#18249) * concurrency issue with StubServiceEmitter - #18121 have added a few new metrics which have increased the load on it and have caused random appearances of an issue arising from the fact it used an `ArrayList` under the hood. * added some catches to shut down queries properly in case some unexpected exceptions occur - this could give better exceptions and reduce time to fix in the future this should reduce the probability that `QTest` splits remain hanging --- .../msq/dart/controller/sql/DartQueryMaker.java | 9 +++++++++ .../org/apache/druid/msq/exec/ControllerImpl.java | 13 +++++++++++++ .../overlord/duty/UnusedSegmentsKillerTest.java | 3 ++- pom.xml | 2 +- .../util/metrics/HttpPostEmitterMonitorTest.java | 8 ++++---- .../druid/java/util/metrics/StubServiceEmitter.java | 21 ++++++++++++--------- .../appenderator/StreamAppenderatorTest.java | 5 +++-- .../druid/server/audit/SQLAuditManagerTest.java | 13 +++++++------ .../druid/server/metrics/LatchableEmitter.java | 5 +++-- 9 files changed, 54 insertions(+), 25 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java index a6583db66fc..60563d6daf4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java @@ -405,6 +405,15 @@ public class DartQueryMaker implements QueryMaker controller.queryId() ); } + catch (Throwable e) { + log.error( + e, + "Controller failed for sqlQueryId[%s], controllerHost[%s]", + plannerContext.getSqlQueryId(), + controller.queryId() + ); + throw e; + } finally { controllerRegistry.deregister(controllerHolder); Thread.currentThread().setName(threadName); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 9c3d94094cb..a68a58ae476 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -343,10 +343,23 @@ public class ControllerImpl implements Controller try (final Closer closer = Closer.create()) { reportPayload = runInternal(queryListener, closer); } + catch (Throwable e) { + log.error(e, "Controller internal execution encountered exception."); + queryListener.onQueryComplete(makeStatusReportForException(e)); + throw e; + } // Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing. queryListener.onQueryComplete(reportPayload); } + + private MSQTaskReportPayload makeStatusReportForException(Throwable e) + { + MSQErrorReport errorReport = MSQErrorReport.fromFault(queryId(), null, null, UnknownFault.forException(e)); + MSQStatusReport statusReport = new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0, new HashMap<>(), 0, 0, null, null); + return new MSQTaskReportPayload(statusReport, null, null, null); + } + @Override public void stop(CancellationReason reason) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java index 1130eefa8d7..111489a74dc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java @@ -52,6 +52,7 @@ import org.junit.Rule; import org.junit.Test; import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; @@ -328,7 +329,7 @@ public class UnusedSegmentsKillerTest emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10); // Verify that the kill intervals are sorted with the oldest interval first - final List<StubServiceEmitter.ServiceMetricEventSnapshot> events = + final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events = emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION); final List<Interval> killIntervals = events.stream().map(event -> { final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID); diff --git a/pom.xml b/pom.xml index 082a9a0cb80..05bd09e6ba8 100644 --- a/pom.xml +++ b/pom.xml @@ -1094,7 +1094,7 @@ <dependency> <groupId>org.junit</groupId> <artifactId>junit-bom</artifactId> - <version>5.10.2</version> + <version>5.13.3</version> <type>pom</type> <scope>import</scope> </dependency> diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java index 601dc4a61b4..2be52c091e7 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java @@ -25,8 +25,8 @@ import org.apache.druid.java.util.emitter.core.HttpPostEmitter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.List; import java.util.Map; +import java.util.Queue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -65,7 +65,7 @@ public class HttpPostEmitterMonitorTest assertTrue(monitor.doMonitor(stubServiceEmitter)); - final Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents(); + final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents(); assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0); assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L); @@ -83,8 +83,8 @@ public class HttpPostEmitterMonitorTest assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L); } - private void assertMetricValue(Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue) + private void assertMetricValue(Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue) { - assertEquals(metricEvents.get(metricName).get(0).getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue()); + assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue()); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 323d8cd308c..55113b97ac2 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -24,11 +24,14 @@ import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; /** * Test implementation of {@link ServiceEmitter} that collects emitted metrics @@ -36,9 +39,9 @@ import java.util.concurrent.ConcurrentHashMap; */ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier { - private final List<Event> events = new ArrayList<>(); - private final List<AlertEvent> alertEvents = new ArrayList<>(); - private final ConcurrentHashMap<String, List<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>(); + private final Queue<Event> events = new ConcurrentLinkedDeque<>(); + private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>(); + private final ConcurrentHashMap<String, Queue<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>(); public StubServiceEmitter() { @@ -55,7 +58,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie { if (event instanceof ServiceMetricEvent) { ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; - metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>()) + metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ConcurrentLinkedDeque<>()) .add(new ServiceMetricEventSnapshot(metricEvent)); } else if (event instanceof AlertEvent) { alertEvents.add((AlertEvent) event); @@ -68,7 +71,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie */ public List<Event> getEvents() { - return events; + return new ArrayList<>(events); } /** @@ -76,7 +79,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie * * @return Map from metric name to list of events emitted for that metric. */ - public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents() + public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents() { return metricEvents; } @@ -86,7 +89,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie */ public List<AlertEvent> getAlerts() { - return alertEvents; + return new ArrayList<>(alertEvents); } @Override @@ -96,8 +99,8 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie ) { final List<Number> values = new ArrayList<>(); - final List<ServiceMetricEventSnapshot> events = - metricEvents.getOrDefault(metricName, Collections.emptyList()); + final Queue<ServiceMetricEventSnapshot> events = + metricEvents.getOrDefault(metricName, new ArrayDeque<>()); final Map<String, Object> filters = dimensionFilters == null ? Collections.emptyMap() : dimensionFilters; for (ServiceMetricEventSnapshot event : events) { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index fee0d2ca757..463c4ed8c53 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -78,6 +78,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -162,7 +163,7 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest ).get(); Assert.assertEquals( ImmutableMap.of("x", "3"), - (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata() + segmentsAndCommitMetadata.getCommitMetadata() ); Assert.assertEquals( IDENTIFIERS.subList(0, 2), @@ -2278,7 +2279,7 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> segmentIds) { - Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents(); + Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents(); int segments = segmentIds.size(); Assert.assertEquals(4, events.size()); Assert.assertTrue(events.containsKey("query/cpu/time")); diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java index f10248bf1e9..3505eb943a2 100644 --- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java @@ -43,6 +43,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.TreeMap; @RunWith(MockitoJUnitRunner.class) @@ -91,14 +92,14 @@ public class SQLAuditManagerTest final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc()); auditManager.doAudit(entry); - Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents(); + Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents(); Assert.assertEquals(1, metricEvents.size()); - List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit"); + Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent(); + ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent(); final AuditEntry dbEntry = lookupAuditEntryForKey("testKey"); Assert.assertNotNull(dbEntry); @@ -120,14 +121,14 @@ public class SQLAuditManagerTest Assert.assertEquals(entry, dbEntry); // Verify emitted metrics - Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents(); + Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents(); Assert.assertEquals(1, metricEvents.size()); - List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit"); + Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent(); + ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent(); Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key")); Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type")); Assert.assertNull(metric.getUserDims().get("payload")); diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index 87a5b612147..aeed3c40ea9 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -181,13 +181,14 @@ public class LatchableEmitter extends StubServiceEmitter return; } + List<Event> events = getEvents(); for (WaitCondition condition : conditionsToEvaluate) { - final int currentNumberOfEvents = getEvents().size(); + final int currentNumberOfEvents = events.size(); // Do not use an iterator over the list to avoid concurrent modification exceptions // Evaluate new events against this condition for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) { - if (condition.predicate.test(getEvents().get(i))) { + if (condition.predicate.test(events.get(i))) { condition.countDownLatch.countDown(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org