kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183522014


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object 
buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats 
will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) 
buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) 
buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) 
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + 
buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new 
SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = 
parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : 
completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = 
pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + 
pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, 
unparseableEvents);
-
-      
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = 
getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), 
unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? 
indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new 
SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : 
completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = 
generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + 
generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, 
unparseableEvents);
-
-        
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = 
getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), 
unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals 
getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new 
SimpleRowIngestionMeters();
-    for (String runningTaskId : runningTaskIds) {
-      try {
-        final Map<String, Object> report = 
getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
-        if (report == null || report.isEmpty()) {
-          // task does not have a running report yet
-          continue;
-        }
-
-        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) 
report.get("ingestionStatsAndErrors");
-        Map<String, Object> payload = (Map<String, Object>) 
ingestionStatsAndErrors.get("payload");
-        Map<String, Object> rowStats = (Map<String, Object>) 
payload.get("rowStats");
-        Map<String, Object> totals = (Map<String, Object>) 
rowStats.get("totals");
-        Map<String, Object> buildSegments = (Map<String, Object>) 
totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
-        if (includeUnparseable) {
-          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) 
payload.get("unparseableEvents");
-          List<ParseExceptionReport> buildSegmentsUnparseableEvents = 
(List<ParseExceptionReport>)
-              taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
-          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
-        }
-
-        
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
-      }
-      catch (Exception e) {
-        LOG.warn(e, "Encountered exception when getting live subtask report 
for task: " + runningTaskId);
-      }
+    try {
+      return getTaskReport(toolbox.getOverlordClient(), taskId);
     }
-    return buildSegmentsRowStats.getTotals();
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> 
createStatsAndErrorsReport(
-      RowIngestionMetersTotals rowStats,
-      List<ParseExceptionReport> unparseableEvents
-  )
-  {
-    Map<String, Object> rowStatsMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
-    rowStatsMap.put("totals", totalsMap);
-
-    return Pair.of(rowStatsMap, 
ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
-  }
-
-  private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
-  {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = 
(IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
-    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-    );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = 
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    
.get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    catch (Exception e) {
+      LOG.warn(e, "Encountered exception when getting live subtask report for 
task: " + taskId);
     }
-    return totals;
+    return null;
   }
 
-  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEvents(
-      String full,
-      boolean includeUnparseable
-  )
+  private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full, 
boolean includeUnparseable)
   {
     if (currentSubTaskHolder == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
     Object currentRunner = currentSubTaskHolder.getTask();
     if (currentRunner == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
-    if (isParallelMode()) {

Review Comment:
   I think we can retain this if-else flow (but with some of the clean up that 
you have done).
   - for sequential, we will just create the `ParallelIndexTaskStats` object 
here itself.
   - for single phase, we will create a `ParallelIndexStatsReporter` and do the 
computations
   - for multi phase, we will call a method, which checks if the stats are 
already cached and if the current phase is the correct one. If not, it 
constructs a `ParallelIndexStatsReporter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to