gianm commented on code in PR #17170:
URL: https://github.com/apache/druid/pull/17170#discussion_r1833036961


##########
processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java:
##########
@@ -66,11 +66,11 @@ public void testDefaultQueryMetricsQuery()
         .context(ImmutableMap.of("testKey", "testValue"))
         .build();
     queryMetrics.query(query);
+    queryMetrics.sqlQueryId("dummy");
+    queryMetrics.queryId("dummy");

Review Comment:
   why did these need to move?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -273,13 +293,20 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final 
Query<T> query, final
 
                     // Regardless of whether caching is enabled, do 
reportSegmentAndCacheTime outside the
                     // *possible* caching.
-                    runner = new MetricsEmittingQueryRunner<>(
+                    runner = new SinkMetricsEmittingQueryRunner<>(
                         emitter,
                         factory.getToolchest(),
                         runner,
-                        QueryMetrics::reportSegmentAndCacheTime,
-                        queryMetrics -> 
queryMetrics.segment(sinkSegmentId.toString())
-                    ).withWaitMeasuredFromNow();
+                        metricsToReport,
+                        segmentMetricsAccumulator,
+                        new HashSet<>(

Review Comment:
   No need to create this hash set for every hydrant, use an static immutable 
set.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -231,12 +249,14 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final 
Query<T> query, final
               descriptor,
               sinkSegmentReferences.stream().map(
                   segmentReference -> {
-                    QueryRunner<T> runner = new MetricsEmittingQueryRunner<>(
+                    QueryRunner<T> runner = new 
SinkMetricsEmittingQueryRunner<>(
                         emitter,
                         factory.getToolchest(),
                         factory.createRunner(segmentReference.getSegment()),
-                        QueryMetrics::reportSegmentTime,
-                        queryMetrics -> 
queryMetrics.segment(sinkSegmentId.toString())
+                        metricsToReport,
+                        segmentMetricsAccumulator,
+                        
Collections.singleton(DefaultQueryMetrics.QUERY_SEGMENT_TIME),

Review Comment:
   Use a static for this.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -193,6 +205,12 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final 
Query<T> query, final
     final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
     final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
     final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = 
new LinkedHashMap<>();
+    final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> 
segmentMetricsAccumulator = new ConcurrentHashMap<>();

Review Comment:
   When I look at this, I wonder about performance. It's probably fine the way 
it is, but IMO it would be better to use a class here (with three `AtomicLong` 
rather than a `Map<String, AtomicLong>`). That would save various per-hydrant 
hash-table lookups. I think the code would also be clearer to read.
   
   In that world, `segmentMetricsAccumulator` would be 
`ConcurrentHashMap<String, TheNewClass>`.
   
   If you keep the inner holder as a `Map`, then at least make 
`metricsToReport` a static.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final 
SegmentId segmentId, final
     // with subsegments (hydrants).
     return segmentId + "_H" + hydrantNumber;
   }
+  
+  /**
+   * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time 
metrics for a Sink.
+   * It accumulates query/segment/time and query/segmentAndCache/time metric 
for each FireHydrant at the level of Sink.
+   * query/wait/time metric is the time taken to process the first FireHydrant 
for the Sink.
+   */
+  private static class SinkMetricsEmittingQueryRunner<T> implements 
QueryRunner<T>
+  {
+    private final ServiceEmitter emitter;
+    private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
+    private final QueryRunner<T> queryRunner;
+    private final Map<String, ObjLongConsumer<? super QueryMetrics<?>>> 
metricsToReport;
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
AtomicLong>> segmentMetricsAccumulator;
+    private final Set<String> metricsToCompute;
+    @Nullable
+    private final String segmentId;
+    private final long creationTimeNs;
+
+    private SinkMetricsEmittingQueryRunner(
+        ServiceEmitter emitter,
+        QueryToolChest<T, ? extends Query<T>> queryToolChest,
+        QueryRunner<T> queryRunner,
+        Map<String, ObjLongConsumer<? super QueryMetrics<?>>> metricsToReport,
+        ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> 
segmentMetricsAccumulator,
+        Set<String> metricsToCompute,
+        @Nullable String segmentId
+    )
+    {
+      this.emitter = emitter;
+      this.queryToolChest = queryToolChest;
+      this.queryRunner = queryRunner;
+      this.metricsToReport = metricsToReport;
+      this.segmentMetricsAccumulator = segmentMetricsAccumulator;
+      this.metricsToCompute = metricsToCompute;
+      this.segmentId = segmentId;
+      this.creationTimeNs = System.nanoTime();
+    }
+
+    @Override
+    public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext 
responseContext)
+    {
+      QueryPlus<T> queryWithMetrics = 
queryPlus.withQueryMetrics(queryToolChest);
+      final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();

Review Comment:
   It looks like `queryMetrics` isn't used unless `segmentId` is null. Don't 
create it if it won't be used.



##########
processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java:
##########
@@ -197,7 +198,7 @@ public ServiceMetricEvent build(ImmutableMap<String, 
String> serviceDimensions)
       return new ServiceMetricEvent(
           createdTime,
           serviceDimensions,
-          userDims,
+          new HashMap<>(userDims),

Review Comment:
   why does this need to be copied?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final 
SegmentId segmentId, final
     // with subsegments (hydrants).
     return segmentId + "_H" + hydrantNumber;
   }
+  
+  /**
+   * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time 
metrics for a Sink.
+   * It accumulates query/segment/time and query/segmentAndCache/time metric 
for each FireHydrant at the level of Sink.
+   * query/wait/time metric is the time taken to process the first FireHydrant 
for the Sink.
+   */
+  private static class SinkMetricsEmittingQueryRunner<T> implements 
QueryRunner<T>
+  {
+    private final ServiceEmitter emitter;
+    private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
+    private final QueryRunner<T> queryRunner;
+    private final Map<String, ObjLongConsumer<? super QueryMetrics<?>>> 
metricsToReport;
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
AtomicLong>> segmentMetricsAccumulator;
+    private final Set<String> metricsToCompute;
+    @Nullable
+    private final String segmentId;
+    private final long creationTimeNs;
+
+    private SinkMetricsEmittingQueryRunner(
+        ServiceEmitter emitter,
+        QueryToolChest<T, ? extends Query<T>> queryToolChest,
+        QueryRunner<T> queryRunner,
+        Map<String, ObjLongConsumer<? super QueryMetrics<?>>> metricsToReport,
+        ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> 
segmentMetricsAccumulator,
+        Set<String> metricsToCompute,
+        @Nullable String segmentId
+    )
+    {
+      this.emitter = emitter;
+      this.queryToolChest = queryToolChest;
+      this.queryRunner = queryRunner;
+      this.metricsToReport = metricsToReport;
+      this.segmentMetricsAccumulator = segmentMetricsAccumulator;
+      this.metricsToCompute = metricsToCompute;
+      this.segmentId = segmentId;
+      this.creationTimeNs = System.nanoTime();
+    }
+
+    @Override
+    public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext 
responseContext)
+    {
+      QueryPlus<T> queryWithMetrics = 
queryPlus.withQueryMetrics(queryToolChest);
+      final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
+
+      return Sequences.wrap(
+          // Use LazySequence because we want to account execution time of 
queryRunner.run() (it prepares the underlying
+          // Sequence) as part of the reported query time, i.e. we want to 
execute queryRunner.run() after
+          // `startTimeNs = System.nanoTime();`
+          new LazySequence<>(() -> queryRunner.run(queryWithMetrics, 
responseContext)),
+          new SequenceWrapper()
+          {
+            private long startTimeNs;
+
+            @Override
+            public void before()
+            {
+              startTimeNs = System.nanoTime();
+            }
+
+            @Override
+            public void after(boolean isDone, Throwable thrown)
+            {
+              if (segmentId != null) {
+                // accumulate metrics
+                for (String metric : metricsToCompute) {
+                  if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) {
+                    long waitTimeNs = startTimeNs - creationTimeNs;
+                    // segment wait time is the time taken to start processing 
the first FireHydrant for the Sink
+                    segmentMetricsAccumulator.computeIfAbsent(segmentId, 
metrics -> new ConcurrentHashMap<>())
+                                             .putIfAbsent(metric, new 
AtomicLong(waitTimeNs));
+                  } else {
+                    long timeTakenNs = System.nanoTime() - startTimeNs;
+                    segmentMetricsAccumulator.computeIfAbsent(segmentId, 
metrics -> new ConcurrentHashMap<>())
+                                             .computeIfAbsent(metric, value -> 
new AtomicLong(0))
+                                             .addAndGet(timeTakenNs);
+                  }
+                }
+              } else {
+                // report accumulated metrics
+                for (Map.Entry<String, ConcurrentHashMap<String, AtomicLong>> 
segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
+                  queryMetrics.segment(segmentAndMetrics.getKey());
+
+                  for (Map.Entry<String, ObjLongConsumer<? super 
QueryMetrics<?>>> reportMetric : metricsToReport.entrySet()) {
+                    String metricName = reportMetric.getKey();
+                    if (segmentAndMetrics.getValue().containsKey(metricName)) {
+                      reportMetric.getValue().accept(queryMetrics, 
segmentAndMetrics.getValue().get(metricName).get());
+                    }
+                  }
 
+                  try {
+                    queryMetrics.emit(emitter);
+                  }
+                  catch (Exception e) {
+                    // Query should not fail, because of emitter failure. 
Swallowing the exception.
+                    log.error("Failure while trying to emit [%s] with 
stacktrace [%s]", emitter.toString(), e);

Review Comment:
   `e` should be the first argument, so the logger handles it as an exception. 
It will print the stack trace, message, etc.
   
   There is no point in calling `emitter.toString()` because the emitters don't 
stringify to anything useful. Better to include the segment ID here and say 
something like `Failed to emit metrics for segment[%s]`.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final 
SegmentId segmentId, final
     // with subsegments (hydrants).
     return segmentId + "_H" + hydrantNumber;
   }
+  
+  /**
+   * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time 
metrics for a Sink.
+   * It accumulates query/segment/time and query/segmentAndCache/time metric 
for each FireHydrant at the level of Sink.
+   * query/wait/time metric is the time taken to process the first FireHydrant 
for the Sink.

Review Comment:
   Please extend the javadoc here in two ways:
   
   - Link to `MetricsEmittingQueryRunner` and point out that this is derived 
from that class.
   - Explain that the class behaves differently based on whether `segmentId` is 
null or nonnull. When nonnull it's in "accumulate" mode and when null it's in 
"emit" mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to