Akshat-Jain commented on code in PR #17170:
URL: https://github.com/apache/druid/pull/17170#discussion_r1881398451
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java:
##########
@@ -193,6 +205,12 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final
Query<T> query, final
final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners =
new LinkedHashMap<>();
+ final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>>
segmentMetricsAccumulator = new ConcurrentHashMap<>();
Review Comment:
@gianm Have reported benchmark stats for master code vs this PR's code at
https://github.com/apache/druid/pull/17170#issuecomment-2537836764.
> it would be better to use a class here (with three AtomicLong rather than
a Map<String, AtomicLong>). That would save various per-hydrant hash-table
lookups.
I also tried doing this (haven't pushed the changes to the PR). Reporting
benchmark stats for a few benchmark runs with this change:
```
Benchmark (numFireHydrants) Mode
Cnt Score Error Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 10 avgt
5 0.194 ± 0.012 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 50 avgt
5 0.749 ± 0.072 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 100 avgt
5 1.358 ± 0.248 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 200 avgt
5 2.781 ± 0.491 ms/op
Benchmark (numFireHydrants) Mode
Cnt Score Error Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 10 avgt
5 0.187 ± 0.006 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 50 avgt
5 0.704 ± 0.090 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 100 avgt
5 1.356 ± 0.211 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 200 avgt
5 2.725 ± 0.510 ms/op
Benchmark (numFireHydrants) Mode
Cnt Score Error Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 10 avgt
5 0.190 ± 0.020 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 50 avgt
5 0.710 ± 0.130 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 100 avgt
5 1.428 ± 0.242 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 200 avgt
5 2.791 ± 0.570 ms/op
Benchmark (numFireHydrants) Mode
Cnt Score Error Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 10 avgt
5 0.188 ± 0.007 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 50 avgt
5 0.777 ± 0.051 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 100 avgt
5 1.523 ± 0.026 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 200 avgt
5 3.114 ± 0.191 ms/op
```
This seems almost equal (if not slightly worse) than the current PR code
changes, so I didn't push this change.
I'm dumping a diff of the change here though for your reference:
```diff
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 40fb6078fe..e4ceada727 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -218,7 +218,7 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
final List<SinkSegmentReference> allSegmentReferences = new
ArrayList<>();
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners
= new LinkedHashMap<>();
- final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>>
segmentMetricsAccumulator = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<String,
SinkMetricsEmittingQueryRunner.SegmentMetrics> segmentMetricsAccumulator = new
ConcurrentHashMap<>();
try {
for (final SegmentDescriptor descriptor : specs) {
@@ -469,7 +469,7 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
private final ServiceEmitter emitter;
private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
private final QueryRunner<T> queryRunner;
- private final ConcurrentHashMap<String, ConcurrentHashMap<String,
AtomicLong>> segmentMetricsAccumulator;
+ private final ConcurrentHashMap<String, SegmentMetrics>
segmentMetricsAccumulator;
private final Set<String> metricsToCompute;
@Nullable
private final String segmentId;
@@ -479,7 +479,7 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
ServiceEmitter emitter,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
- ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>>
segmentMetricsAccumulator,
+ ConcurrentHashMap<String, SegmentMetrics> segmentMetricsAccumulator,
Set<String> metricsToCompute,
@Nullable String segmentId
)
@@ -517,29 +517,31 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
{
if (segmentId != null) {
// accumulate metrics
- for (String metric : metricsToCompute) {
- if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) {
- long waitTimeNs = startTimeNs - creationTimeNs;
- // segment wait time is the time taken to start
processing the first FireHydrant for the Sink
- segmentMetricsAccumulator.computeIfAbsent(segmentId,
metrics -> new ConcurrentHashMap<>())
- .putIfAbsent(metric, new
AtomicLong(waitTimeNs));
- } else {
- long timeTakenNs = System.nanoTime() - startTimeNs;
- segmentMetricsAccumulator.computeIfAbsent(segmentId,
metrics -> new ConcurrentHashMap<>())
- .computeIfAbsent(metric, value
-> new AtomicLong(0))
- .addAndGet(timeTakenNs);
- }
+ final SegmentMetrics metrics =
segmentMetricsAccumulator.computeIfAbsent(segmentId, id -> new
SegmentMetrics());
+ if
(metricsToCompute.contains(DefaultQueryMetrics.QUERY_WAIT_TIME)) {
+ metrics.setWaitTime(startTimeNs - creationTimeNs);
+ }
+ if
(metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_TIME)) {
+ metrics.addSegmentTime(System.nanoTime() - startTimeNs);
+ }
+ if
(metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME)) {
+ metrics.addSegmentAndCacheTime(System.nanoTime() -
startTimeNs);
}
} else {
final QueryMetrics<?> queryMetrics =
queryWithMetrics.getQueryMetrics();
// report accumulated metrics
- for (Map.Entry<String, ConcurrentHashMap<String,
AtomicLong>> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
+ for (Map.Entry<String, SegmentMetrics> segmentAndMetrics :
segmentMetricsAccumulator.entrySet()) {
queryMetrics.segment(segmentAndMetrics.getKey());
for (Map.Entry<String, ObjLongConsumer<? super
QueryMetrics<?>>> reportMetric : METRICS_TO_REPORT.entrySet()) {
- String metricName = reportMetric.getKey();
- if
(segmentAndMetrics.getValue().containsKey(metricName)) {
- reportMetric.getValue().accept(queryMetrics,
segmentAndMetrics.getValue().get(metricName).get());
+ final String metricName = reportMetric.getKey();
+ switch (metricName) {
+ case DefaultQueryMetrics.QUERY_SEGMENT_TIME:
+ reportMetric.getValue().accept(queryMetrics,
segmentAndMetrics.getValue().getSegmentTime());
+ case DefaultQueryMetrics.QUERY_WAIT_TIME:
+ reportMetric.getValue().accept(queryMetrics,
segmentAndMetrics.getValue().getWaitTime());
+ case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME:
+ reportMetric.getValue().accept(queryMetrics,
segmentAndMetrics.getValue().getSegmentAndCacheTime());
}
}
@@ -556,6 +558,36 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
}
);
}
+
+ private static class SegmentMetrics {
+ private final AtomicLong querySegmentTime = new AtomicLong(0);
+ private final AtomicLong queryWaitTime = new AtomicLong(0);
+ private final AtomicLong querySegmentAndCacheTime = new AtomicLong(0);
+
+ private void addSegmentTime(long time) {
+ querySegmentTime.addAndGet(time);
+ }
+
+ private void setWaitTime(long time) {
+ queryWaitTime.set(time);
+ }
+
+ private void addSegmentAndCacheTime(long time) {
+ querySegmentAndCacheTime.addAndGet(time);
+ }
+
+ private long getSegmentTime() {
+ return querySegmentTime.get();
+ }
+
+ private long getWaitTime() {
+ return queryWaitTime.get();
+ }
+
+ private long getSegmentAndCacheTime() {
+ return querySegmentAndCacheTime.get();
+ }
+ }
}
```
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]