Akshat-Jain commented on code in PR #17170:
URL: https://github.com/apache/druid/pull/17170#discussion_r1877511720


##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final 
SegmentId segmentId, final
     // with subsegments (hydrants).
     return segmentId + "_H" + hydrantNumber;
   }
+  
+  /**
+   * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time 
metrics for a Sink.
+   * It accumulates query/segment/time and query/segmentAndCache/time metric 
for each FireHydrant at the level of Sink.
+   * query/wait/time metric is the time taken to process the first FireHydrant 
for the Sink.
+   */
+  private static class SinkMetricsEmittingQueryRunner<T> implements 
QueryRunner<T>
+  {
+    private final ServiceEmitter emitter;
+    private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
+    private final QueryRunner<T> queryRunner;
+    private final Map<String, ObjLongConsumer<? super QueryMetrics<?>>> 
metricsToReport;
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
AtomicLong>> segmentMetricsAccumulator;
+    private final Set<String> metricsToCompute;
+    @Nullable
+    private final String segmentId;
+    private final long creationTimeNs;
+
+    private SinkMetricsEmittingQueryRunner(
+        ServiceEmitter emitter,
+        QueryToolChest<T, ? extends Query<T>> queryToolChest,
+        QueryRunner<T> queryRunner,
+        Map<String, ObjLongConsumer<? super QueryMetrics<?>>> metricsToReport,
+        ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> 
segmentMetricsAccumulator,
+        Set<String> metricsToCompute,
+        @Nullable String segmentId
+    )
+    {
+      this.emitter = emitter;
+      this.queryToolChest = queryToolChest;
+      this.queryRunner = queryRunner;
+      this.metricsToReport = metricsToReport;
+      this.segmentMetricsAccumulator = segmentMetricsAccumulator;
+      this.metricsToCompute = metricsToCompute;
+      this.segmentId = segmentId;
+      this.creationTimeNs = System.nanoTime();
+    }
+
+    @Override
+    public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext 
responseContext)
+    {
+      QueryPlus<T> queryWithMetrics = 
queryPlus.withQueryMetrics(queryToolChest);
+      final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
+
+      return Sequences.wrap(
+          // Use LazySequence because we want to account execution time of 
queryRunner.run() (it prepares the underlying
+          // Sequence) as part of the reported query time, i.e. we want to 
execute queryRunner.run() after
+          // `startTimeNs = System.nanoTime();`
+          new LazySequence<>(() -> queryRunner.run(queryWithMetrics, 
responseContext)),
+          new SequenceWrapper()
+          {
+            private long startTimeNs;
+
+            @Override
+            public void before()
+            {
+              startTimeNs = System.nanoTime();
+            }
+
+            @Override
+            public void after(boolean isDone, Throwable thrown)
+            {
+              if (segmentId != null) {
+                // accumulate metrics
+                for (String metric : metricsToCompute) {
+                  if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) {
+                    long waitTimeNs = startTimeNs - creationTimeNs;
+                    // segment wait time is the time taken to start processing 
the first FireHydrant for the Sink
+                    segmentMetricsAccumulator.computeIfAbsent(segmentId, 
metrics -> new ConcurrentHashMap<>())
+                                             .putIfAbsent(metric, new 
AtomicLong(waitTimeNs));
+                  } else {
+                    long timeTakenNs = System.nanoTime() - startTimeNs;
+                    segmentMetricsAccumulator.computeIfAbsent(segmentId, 
metrics -> new ConcurrentHashMap<>())
+                                             .computeIfAbsent(metric, value -> 
new AtomicLong(0))
+                                             .addAndGet(timeTakenNs);
+                  }
+                }
+              } else {
+                // report accumulated metrics
+                for (Map.Entry<String, ConcurrentHashMap<String, AtomicLong>> 
segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
+                  queryMetrics.segment(segmentAndMetrics.getKey());
+
+                  for (Map.Entry<String, ObjLongConsumer<? super 
QueryMetrics<?>>> reportMetric : metricsToReport.entrySet()) {
+                    String metricName = reportMetric.getKey();
+                    if (segmentAndMetrics.getValue().containsKey(metricName)) {
+                      reportMetric.getValue().accept(queryMetrics, 
segmentAndMetrics.getValue().get(metricName).get());
+                    }
+                  }
 
+                  try {
+                    queryMetrics.emit(emitter);
+                  }
+                  catch (Exception e) {
+                    // Query should not fail, because of emitter failure. 
Swallowing the exception.
+                    log.error("Failure while trying to emit [%s] with 
stacktrace [%s]", emitter.toString(), e);

Review Comment:
   Have made the change.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final 
SegmentId segmentId, final
     // with subsegments (hydrants).
     return segmentId + "_H" + hydrantNumber;
   }
+  
+  /**
+   * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time 
metrics for a Sink.
+   * It accumulates query/segment/time and query/segmentAndCache/time metric 
for each FireHydrant at the level of Sink.
+   * query/wait/time metric is the time taken to process the first FireHydrant 
for the Sink.

Review Comment:
   Have made the change.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final 
SegmentId segmentId, final
     // with subsegments (hydrants).
     return segmentId + "_H" + hydrantNumber;
   }
+  
+  /**
+   * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time 
metrics for a Sink.
+   * It accumulates query/segment/time and query/segmentAndCache/time metric 
for each FireHydrant at the level of Sink.
+   * query/wait/time metric is the time taken to process the first FireHydrant 
for the Sink.
+   */
+  private static class SinkMetricsEmittingQueryRunner<T> implements 
QueryRunner<T>
+  {
+    private final ServiceEmitter emitter;
+    private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
+    private final QueryRunner<T> queryRunner;
+    private final Map<String, ObjLongConsumer<? super QueryMetrics<?>>> 
metricsToReport;
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
AtomicLong>> segmentMetricsAccumulator;
+    private final Set<String> metricsToCompute;
+    @Nullable
+    private final String segmentId;
+    private final long creationTimeNs;
+
+    private SinkMetricsEmittingQueryRunner(
+        ServiceEmitter emitter,
+        QueryToolChest<T, ? extends Query<T>> queryToolChest,
+        QueryRunner<T> queryRunner,
+        Map<String, ObjLongConsumer<? super QueryMetrics<?>>> metricsToReport,
+        ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> 
segmentMetricsAccumulator,
+        Set<String> metricsToCompute,
+        @Nullable String segmentId
+    )
+    {
+      this.emitter = emitter;
+      this.queryToolChest = queryToolChest;
+      this.queryRunner = queryRunner;
+      this.metricsToReport = metricsToReport;
+      this.segmentMetricsAccumulator = segmentMetricsAccumulator;
+      this.metricsToCompute = metricsToCompute;
+      this.segmentId = segmentId;
+      this.creationTimeNs = System.nanoTime();
+    }
+
+    @Override
+    public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext 
responseContext)
+    {
+      QueryPlus<T> queryWithMetrics = 
queryPlus.withQueryMetrics(queryToolChest);
+      final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();

Review Comment:
   Have made the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to