kfaraz commented on code in PR #17581:
URL: https://github.com/apache/druid/pull/17581#discussion_r1921796996
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1251,12 +1252,19 @@ private TaskStatus runSequential(TaskToolbox toolbox)
throws Exception
*/
private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
{
- return buildIngestionStatsAndContextReport(
+ final var taskCompletionReport = buildIngestionStatsAndContextReport(
IngestionState.COMPLETED,
taskStatus.getErrorMsg(),
segmentsRead,
segmentsPublished
);
+ final var totalProcessedBytes =
indexGenerateRowStats.lhs.get("processedBytes");
+ // Emit the processed bytes metric
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ //IndexTaskUtils.setTaskDimensions(metricBuilder, task);
Review Comment:
Why is this commented out?
You may use
```
IndexTaskUtils.setTaskDimensions(metricBuilder, this);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1251,12 +1252,19 @@ private TaskStatus runSequential(TaskToolbox toolbox)
throws Exception
*/
private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
{
- return buildIngestionStatsAndContextReport(
+ final var taskCompletionReport = buildIngestionStatsAndContextReport(
Review Comment:
Druid code doesn't really use the `var` keyword yet. So, it would be nice to
stick to static typing for the time being.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1633,6 +1641,9 @@ private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseab
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
+ // Emit the processed bytes metric
+ emitMetric(toolbox.getEmitter(), "ingest/processed/bytes",
rowStatsForRunningTasks.getProcessedBytes());
Review Comment:
this should not be needed now, since we are already emitting the metric in
`getTaskCompletionReports()` method.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -529,6 +531,32 @@ private MSQTaskReportPayload runInternal(final
QueryListener queryListener, fina
countersSnapshot,
null
);
+ // Emit summary metrics
+ emitSummaryMetrics(msqTaskReportPayload, querySpec);
+ return msqTaskReportPayload;
+ }
+
+ private void emitSummaryMetrics(final MSQTaskReportPayload
msqTaskReportPayload, final MSQSpec querySpec)
+ {
+ long totalProcessedBytes = 0;
+
+ if (msqTaskReportPayload.getCounters() != null) {
+ totalProcessedBytes = msqTaskReportPayload.getCounters()
+ .copyMap()
+ .values()
+ .stream()
+ .flatMap(counterSnapshotsMap ->
counterSnapshotsMap.values().stream())
+ .flatMap(counterSnapshots ->
counterSnapshots.getMap().entrySet().stream())
+ .filter(entry -> entry.getKey().startsWith("input"))
+ .mapToLong(entry -> {
+ ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot)
entry.getValue();
+ return snapshot.getBytes() == null ? 0L :
Arrays.stream(snapshot.getBytes()).sum();
+ })
+ .sum();
+ }
+
+ log.debug("Processed bytes[%d] for query[%s].", totalProcessedBytes,
querySpec.getQuery());
+ context.emitMetric("ingest/processed/bytes", totalProcessedBytes);
Review Comment:
```suggestion
context.emitMetric("ingest/input/bytes", totalProcessedBytes);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1251,12 +1252,19 @@ private TaskStatus runSequential(TaskToolbox toolbox)
throws Exception
*/
private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
{
- return buildIngestionStatsAndContextReport(
+ final var taskCompletionReport = buildIngestionStatsAndContextReport(
IngestionState.COMPLETED,
taskStatus.getErrorMsg(),
segmentsRead,
segmentsPublished
);
+ final var totalProcessedBytes =
indexGenerateRowStats.lhs.get("processedBytes");
+ // Emit the processed bytes metric
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ //IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+ toolbox.getEmitter().emit(
+ metricBuilder.setMetric("ingest/processed/bytes", (Number)
totalProcessedBytes));
Review Comment:
Let's use the metric name `ingest/input/bytes` instead since we are already
using that name to denote the same metric in `TaskRealtimeMetricsMonitor`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]