kfaraz commented on code in PR #17581:
URL: https://github.com/apache/druid/pull/17581#discussion_r1921796996


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1251,12 +1252,19 @@ private TaskStatus runSequential(TaskToolbox toolbox) 
throws Exception
    */
   private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
   {
-    return buildIngestionStatsAndContextReport(
+    final var taskCompletionReport =  buildIngestionStatsAndContextReport(
         IngestionState.COMPLETED,
         taskStatus.getErrorMsg(),
         segmentsRead,
         segmentsPublished
     );
+    final var totalProcessedBytes = 
indexGenerateRowStats.lhs.get("processedBytes");
+    // Emit the processed bytes metric
+    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+    //IndexTaskUtils.setTaskDimensions(metricBuilder, task);

Review Comment:
   Why is this commented out?
   You may use 
   ```
   IndexTaskUtils.setTaskDimensions(metricBuilder, this);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1251,12 +1252,19 @@ private TaskStatus runSequential(TaskToolbox toolbox) 
throws Exception
    */
   private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
   {
-    return buildIngestionStatsAndContextReport(
+    final var taskCompletionReport =  buildIngestionStatsAndContextReport(

Review Comment:
   Druid code doesn't really use the `var` keyword yet. So, it would be nice to 
stick to static typing for the time being.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1633,6 +1641,9 @@ private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseab
     );
     buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
 
+    // Emit the processed bytes metric
+    emitMetric(toolbox.getEmitter(), "ingest/processed/bytes", 
rowStatsForRunningTasks.getProcessedBytes());

Review Comment:
   this should not be needed now, since we are already emitting the metric in 
`getTaskCompletionReports()` method.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -529,6 +531,32 @@ private MSQTaskReportPayload runInternal(final 
QueryListener queryListener, fina
         countersSnapshot,
         null
     );
+    // Emit summary metrics
+    emitSummaryMetrics(msqTaskReportPayload, querySpec);
+    return msqTaskReportPayload;
+  }
+
+  private void emitSummaryMetrics(final MSQTaskReportPayload 
msqTaskReportPayload, final MSQSpec querySpec)
+  {
+    long totalProcessedBytes = 0;
+
+    if (msqTaskReportPayload.getCounters() != null) {
+      totalProcessedBytes = msqTaskReportPayload.getCounters()
+          .copyMap()
+          .values()
+          .stream()
+          .flatMap(counterSnapshotsMap -> 
counterSnapshotsMap.values().stream())
+          .flatMap(counterSnapshots -> 
counterSnapshots.getMap().entrySet().stream())
+          .filter(entry -> entry.getKey().startsWith("input"))
+          .mapToLong(entry -> {
+            ChannelCounters.Snapshot snapshot = (ChannelCounters.Snapshot) 
entry.getValue();
+            return snapshot.getBytes() == null ? 0L : 
Arrays.stream(snapshot.getBytes()).sum();
+          })
+          .sum();
+    }
+
+    log.debug("Processed bytes[%d] for query[%s].", totalProcessedBytes, 
querySpec.getQuery());
+    context.emitMetric("ingest/processed/bytes", totalProcessedBytes);

Review Comment:
   ```suggestion
       context.emitMetric("ingest/input/bytes", totalProcessedBytes);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1251,12 +1252,19 @@ private TaskStatus runSequential(TaskToolbox toolbox) 
throws Exception
    */
   private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
   {
-    return buildIngestionStatsAndContextReport(
+    final var taskCompletionReport =  buildIngestionStatsAndContextReport(
         IngestionState.COMPLETED,
         taskStatus.getErrorMsg(),
         segmentsRead,
         segmentsPublished
     );
+    final var totalProcessedBytes = 
indexGenerateRowStats.lhs.get("processedBytes");
+    // Emit the processed bytes metric
+    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+    //IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+    toolbox.getEmitter().emit(
+        metricBuilder.setMetric("ingest/processed/bytes", (Number) 
totalProcessedBytes));

Review Comment:
   Let's use the metric name `ingest/input/bytes` instead since we are already 
using that name to denote the same metric in `TaskRealtimeMetricsMonitor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to