kfaraz commented on a change in pull request #12280:
URL: https://github.com/apache/druid/pull/12280#discussion_r816511853



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1535,27 +1571,47 @@ private RowIngestionMetersTotals 
getTotalsFromBuildSegmentsRowStats(Object build
           unparseableEvents.addAll(buildSegmentsUnparseableEvents);
         }
 
-        processed += ((Number) buildSegments.get("processed")).longValue();
-        processedWithError += ((Number) 
buildSegments.get("processedWithError")).longValue();
-        thrownAway += ((Number) buildSegments.get("thrownAway")).longValue();
-        unparseable += ((Number) buildSegments.get("unparseable")).longValue();
+        RowIngestionMetersTotals runningTaskBuildSegmentsRowStats = new 
RowIngestionMetersTotals(((Number) buildSegments.get("processed")).longValue(),
+                                                                               
   ((Number) buildSegments.get("processedWithError")).longValue(),
+                                                                               
   ((Number) buildSegments.get("thrownAway")).longValue(),
+                                                                               
   ((Number) buildSegments.get("unparseable")).longValue());
+        
buildSegmentsRowStats.addRowIngestionMetersTotals(runningTaskBuildSegmentsRowStats);

Review comment:
       ```suggestion
          
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1477,10 +1484,7 @@ private RowIngestionMetersTotals 
getTotalsFromBuildSegmentsRowStats(Object build
       boolean includeUnparseable
   )
   {
-    long processed = 0L;
-    long processedWithError = 0L;
-    long thrownAway = 0L;
-    long unparseable = 0L;
+    MutableRowIngestionMeters buildSegmentsRowStats = new 
MutableRowIngestionMeters();

Review comment:
       Nit:
   ```suggestion
       final MutableRowIngestionMeters buildSegmentsRowStats = new 
MutableRowIngestionMeters();
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1492,31 +1496,63 @@ private RowIngestionMetersTotals 
getTotalsFromBuildSegmentsRowStats(Object build
         LOG.warn("Got an empty task report from subtask: " + 
pushedSegmentsReport.getTaskId());
         continue;
       }
-      IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = 
(IngestionStatsAndErrorsTaskReport) taskReport.get(
-          IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-      IngestionStatsAndErrorsTaskReportData reportData =
-          (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
-      RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-          reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-      );
-
-      if (includeUnparseable) {
-        List<ParseExceptionReport> taskUnparsebleEvents = 
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                               
    .get(RowIngestionMeters.BUILD_SEGMENTS);
-        unparseableEvents.addAll(taskUnparsebleEvents);
-      }
+      RowIngestionMetersTotals rowStatsAndUnparseableEvents =

Review comment:
       ```suggestion
         RowIngestionMetersTotals rowIngestionMetersTotals =
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1492,31 +1496,63 @@ private RowIngestionMetersTotals 
getTotalsFromBuildSegmentsRowStats(Object build
         LOG.warn("Got an empty task report from subtask: " + 
pushedSegmentsReport.getTaskId());
         continue;
       }
-      IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = 
(IngestionStatsAndErrorsTaskReport) taskReport.get(
-          IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-      IngestionStatsAndErrorsTaskReportData reportData =
-          (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
-      RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-          reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-      );
-
-      if (includeUnparseable) {
-        List<ParseExceptionReport> taskUnparsebleEvents = 
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                               
    .get(RowIngestionMeters.BUILD_SEGMENTS);
-        unparseableEvents.addAll(taskUnparsebleEvents);
-      }
+      RowIngestionMetersTotals rowStatsAndUnparseableEvents =
+          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, 
unparseableEvents);
 
-      processed += totals.getProcessed();
-      processedWithError += totals.getProcessedWithError();
-      thrownAway += totals.getThrownAway();
-      unparseable += totals.getUnparseable();
+      
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsAndUnparseableEvents);
     }
 
     // Get stats from running tasks
     Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
+
+    return 
getRunningTaskReportsAndCreateRowStatsAndUnparseableEvents(runningTaskIds,
+                                                                      
unparseableEvents,
+                                                                      
buildSegmentsRowStats,
+                                                                      
includeUnparseable);
+  }
+
+  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEventsParallelMultiPhase(boolean includeUnparseable)
+  {
+    if (indexGenerateRowStats != null) {
+      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? 
indexGenerateRowStats.rhs : ImmutableMap.of());
+    } else if (currentSubTaskHolder == null || !((ParallelIndexTaskRunner) 
currentSubTaskHolder.getTask()).getName().equals("partial segment generation")) 
{
+      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+    } else {
+      ParallelIndexTaskRunner currentRunner = currentSubTaskHolder.getTask();
+      Set<String> runningTaskIds = currentRunner.getRunningTaskIds();
+      Map<String, GeneratedPartitionsReport> completedSubtaskReports = 
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
+
+      MutableRowIngestionMeters buildSegmentsRowStats = new 
MutableRowIngestionMeters();
+
+      List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
+      for (GeneratedPartitionsReport generatedPartitionsReport : 
completedSubtaskReports.values()) {
+        Map<String, TaskReport> taskReport = 
generatedPartitionsReport.getTaskReport();
+        if (taskReport == null || taskReport.isEmpty()) {
+          LOG.warn("Got an empty task report from subtask: " + 
generatedPartitionsReport.getTaskId());
+          continue;
+        }
+        RowIngestionMetersTotals rowStatsAndUnparseableEvents =
+            getBuildSegmentsStatsFromTaskReport(taskReport, true, 
unparseableEvents);
+
+        
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsAndUnparseableEvents);
+      }
+      return 
getRunningTaskReportsAndCreateRowStatsAndUnparseableEvents(runningTaskIds,
+                                                                        
unparseableEvents,
+                                                                        
buildSegmentsRowStats,
+                                                                        
includeUnparseable);
+    }
+  }
+
+  private Pair<Map<String, Object>, Map<String, Object>> 
getRunningTaskReportsAndCreateRowStatsAndUnparseableEvents(

Review comment:
       ```suggestion
     private Pair<Map<String, Object>, Map<String, Object>> 
getRowStatsAndUnparseableEventsForRunningTasks(
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to