kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183522014


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object 
buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats 
will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) 
buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) 
buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) 
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + 
buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new 
SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = 
parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : 
completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = 
pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + 
pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, 
unparseableEvents);
-
-      
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = 
getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), 
unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? 
indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new 
SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : 
completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = 
generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + 
generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, 
unparseableEvents);
-
-        
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = 
getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), 
unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals 
getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new 
SimpleRowIngestionMeters();
-    for (String runningTaskId : runningTaskIds) {
-      try {
-        final Map<String, Object> report = 
getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
-        if (report == null || report.isEmpty()) {
-          // task does not have a running report yet
-          continue;
-        }
-
-        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) 
report.get("ingestionStatsAndErrors");
-        Map<String, Object> payload = (Map<String, Object>) 
ingestionStatsAndErrors.get("payload");
-        Map<String, Object> rowStats = (Map<String, Object>) 
payload.get("rowStats");
-        Map<String, Object> totals = (Map<String, Object>) 
rowStats.get("totals");
-        Map<String, Object> buildSegments = (Map<String, Object>) 
totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
-        if (includeUnparseable) {
-          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) 
payload.get("unparseableEvents");
-          List<ParseExceptionReport> buildSegmentsUnparseableEvents = 
(List<ParseExceptionReport>)
-              taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
-          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
-        }
-
-        
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
-      }
-      catch (Exception e) {
-        LOG.warn(e, "Encountered exception when getting live subtask report 
for task: " + runningTaskId);
-      }
+    try {
+      return getTaskReport(toolbox.getOverlordClient(), taskId);
     }
-    return buildSegmentsRowStats.getTotals();
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> 
createStatsAndErrorsReport(
-      RowIngestionMetersTotals rowStats,
-      List<ParseExceptionReport> unparseableEvents
-  )
-  {
-    Map<String, Object> rowStatsMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
-    rowStatsMap.put("totals", totalsMap);
-
-    return Pair.of(rowStatsMap, 
ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
-  }
-
-  private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
-  {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = 
(IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
-    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-    );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = 
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    
.get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    catch (Exception e) {
+      LOG.warn(e, "Encountered exception when getting live subtask report for 
task: " + taskId);
     }
-    return totals;
+    return null;
   }
 
-  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEvents(
-      String full,
-      boolean includeUnparseable
-  )
+  private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full, 
boolean includeUnparseable)
   {
     if (currentSubTaskHolder == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
     Object currentRunner = currentSubTaskHolder.getTask();
     if (currentRunner == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
-    if (isParallelMode()) {

Review Comment:
   I think we can retain this if-else flow (but with some clean up as you have 
done).
   - for sequential, we will just create the `ParallelIndexTaskStats` object 
here itself.
   - for single phase, we will create a `ParallelIndexStatsReporter` and do the 
computations
   - for multi phase, we will have a method in this class itself, which checks 
if it is already cached or not, otherwise, it constructs a 
`ParallelIndexStatsReporter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to