Akshat-Jain commented on code in PR #17170:
URL: https://github.com/apache/druid/pull/17170#discussion_r1877511720
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final
SegmentId segmentId, final
// with subsegments (hydrants).
return segmentId + "_H" + hydrantNumber;
}
+
+ /**
+ * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time
metrics for a Sink.
+ * It accumulates query/segment/time and query/segmentAndCache/time metric
for each FireHydrant at the level of Sink.
+ * query/wait/time metric is the time taken to process the first FireHydrant
for the Sink.
+ */
+ private static class SinkMetricsEmittingQueryRunner<T> implements
QueryRunner<T>
+ {
+ private final ServiceEmitter emitter;
+ private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
+ private final QueryRunner<T> queryRunner;
+ private final Map<String, ObjLongConsumer<? super QueryMetrics<?>>>
metricsToReport;
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String,
AtomicLong>> segmentMetricsAccumulator;
+ private final Set<String> metricsToCompute;
+ @Nullable
+ private final String segmentId;
+ private final long creationTimeNs;
+
+ private SinkMetricsEmittingQueryRunner(
+ ServiceEmitter emitter,
+ QueryToolChest<T, ? extends Query<T>> queryToolChest,
+ QueryRunner<T> queryRunner,
+ Map<String, ObjLongConsumer<? super QueryMetrics<?>>> metricsToReport,
+ ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>>
segmentMetricsAccumulator,
+ Set<String> metricsToCompute,
+ @Nullable String segmentId
+ )
+ {
+ this.emitter = emitter;
+ this.queryToolChest = queryToolChest;
+ this.queryRunner = queryRunner;
+ this.metricsToReport = metricsToReport;
+ this.segmentMetricsAccumulator = segmentMetricsAccumulator;
+ this.metricsToCompute = metricsToCompute;
+ this.segmentId = segmentId;
+ this.creationTimeNs = System.nanoTime();
+ }
+
+ @Override
+ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext
responseContext)
+ {
+ QueryPlus<T> queryWithMetrics =
queryPlus.withQueryMetrics(queryToolChest);
+ final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
+
+ return Sequences.wrap(
+ // Use LazySequence because we want to account execution time of
queryRunner.run() (it prepares the underlying
+ // Sequence) as part of the reported query time, i.e. we want to
execute queryRunner.run() after
+ // `startTimeNs = System.nanoTime();`
+ new LazySequence<>(() -> queryRunner.run(queryWithMetrics,
responseContext)),
+ new SequenceWrapper()
+ {
+ private long startTimeNs;
+
+ @Override
+ public void before()
+ {
+ startTimeNs = System.nanoTime();
+ }
+
+ @Override
+ public void after(boolean isDone, Throwable thrown)
+ {
+ if (segmentId != null) {
+ // accumulate metrics
+ for (String metric : metricsToCompute) {
+ if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) {
+ long waitTimeNs = startTimeNs - creationTimeNs;
+ // segment wait time is the time taken to start processing
the first FireHydrant for the Sink
+ segmentMetricsAccumulator.computeIfAbsent(segmentId,
metrics -> new ConcurrentHashMap<>())
+ .putIfAbsent(metric, new
AtomicLong(waitTimeNs));
+ } else {
+ long timeTakenNs = System.nanoTime() - startTimeNs;
+ segmentMetricsAccumulator.computeIfAbsent(segmentId,
metrics -> new ConcurrentHashMap<>())
+ .computeIfAbsent(metric, value ->
new AtomicLong(0))
+ .addAndGet(timeTakenNs);
+ }
+ }
+ } else {
+ // report accumulated metrics
+ for (Map.Entry<String, ConcurrentHashMap<String, AtomicLong>>
segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
+ queryMetrics.segment(segmentAndMetrics.getKey());
+
+ for (Map.Entry<String, ObjLongConsumer<? super
QueryMetrics<?>>> reportMetric : metricsToReport.entrySet()) {
+ String metricName = reportMetric.getKey();
+ if (segmentAndMetrics.getValue().containsKey(metricName)) {
+ reportMetric.getValue().accept(queryMetrics,
segmentAndMetrics.getValue().get(metricName).get());
+ }
+ }
+ try {
+ queryMetrics.emit(emitter);
+ }
+ catch (Exception e) {
+ // Query should not fail, because of emitter failure.
Swallowing the exception.
+ log.error("Failure while trying to emit [%s] with
stacktrace [%s]", emitter.toString(), e);
Review Comment:
Have made the change.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final
SegmentId segmentId, final
// with subsegments (hydrants).
return segmentId + "_H" + hydrantNumber;
}
+
+ /**
+ * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time
metrics for a Sink.
+ * It accumulates query/segment/time and query/segmentAndCache/time metric
for each FireHydrant at the level of Sink.
+ * query/wait/time metric is the time taken to process the first FireHydrant
for the Sink.
Review Comment:
Have made the change.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -415,7 +453,110 @@ public static String makeHydrantCacheIdentifier(final
SegmentId segmentId, final
// with subsegments (hydrants).
return segmentId + "_H" + hydrantNumber;
}
+
+ /**
+ * Emit query/segment/time, query/wait/time and query/segmentAndCache/Time
metrics for a Sink.
+ * It accumulates query/segment/time and query/segmentAndCache/time metric
for each FireHydrant at the level of Sink.
+ * query/wait/time metric is the time taken to process the first FireHydrant
for the Sink.
+ */
+ private static class SinkMetricsEmittingQueryRunner<T> implements
QueryRunner<T>
+ {
+ private final ServiceEmitter emitter;
+ private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
+ private final QueryRunner<T> queryRunner;
+ private final Map<String, ObjLongConsumer<? super QueryMetrics<?>>>
metricsToReport;
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String,
AtomicLong>> segmentMetricsAccumulator;
+ private final Set<String> metricsToCompute;
+ @Nullable
+ private final String segmentId;
+ private final long creationTimeNs;
+
+ private SinkMetricsEmittingQueryRunner(
+ ServiceEmitter emitter,
+ QueryToolChest<T, ? extends Query<T>> queryToolChest,
+ QueryRunner<T> queryRunner,
+ Map<String, ObjLongConsumer<? super QueryMetrics<?>>> metricsToReport,
+ ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>>
segmentMetricsAccumulator,
+ Set<String> metricsToCompute,
+ @Nullable String segmentId
+ )
+ {
+ this.emitter = emitter;
+ this.queryToolChest = queryToolChest;
+ this.queryRunner = queryRunner;
+ this.metricsToReport = metricsToReport;
+ this.segmentMetricsAccumulator = segmentMetricsAccumulator;
+ this.metricsToCompute = metricsToCompute;
+ this.segmentId = segmentId;
+ this.creationTimeNs = System.nanoTime();
+ }
+
+ @Override
+ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext
responseContext)
+ {
+ QueryPlus<T> queryWithMetrics =
queryPlus.withQueryMetrics(queryToolChest);
+ final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
Review Comment:
Have made the change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]