kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183522014
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
}
}
- private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object
buildSegmentsRowStats)
- {
- if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
- // This case is for unit tests. Normally when deserialized the row stats
will apppear as a Map<String, Object>.
- return (RowIngestionMetersTotals) buildSegmentsRowStats;
- } else if (buildSegmentsRowStats instanceof Map) {
- Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>)
buildSegmentsRowStats;
- return new RowIngestionMetersTotals(
- ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
- ((Number)
buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
- ((Number)
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
- ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
- ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
- );
- } else {
- // should never happen
- throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " +
buildSegmentsRowStats.getClass());
- }
- }
-
- private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEventsParallelSinglePhase(
- SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
- boolean includeUnparseable
- )
- {
- final SimpleRowIngestionMeters buildSegmentsRowStats = new
SimpleRowIngestionMeters();
-
- List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
- // Get stats from completed tasks
- Map<String, PushedSegmentsReport> completedSubtaskReports =
parallelSinglePhaseRunner.getReports();
- for (PushedSegmentsReport pushedSegmentsReport :
completedSubtaskReports.values()) {
- Map<String, TaskReport> taskReport =
pushedSegmentsReport.getTaskReport();
- if (taskReport == null || taskReport.isEmpty()) {
- LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
- continue;
- }
- RowIngestionMetersTotals rowIngestionMetersTotals =
- getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable,
unparseableEvents);
-
-
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
- }
-
- RowIngestionMetersTotals rowStatsForRunningTasks =
getRowStatsAndUnparseableEventsForRunningTasks(
- parallelSinglePhaseRunner.getRunningTaskIds(),
- unparseableEvents,
- includeUnparseable
- );
- buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
- return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(),
unparseableEvents);
- }
-
- private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEventsParallelMultiPhase(
- ParallelIndexTaskRunner<?, ?> currentRunner,
- boolean includeUnparseable
- )
- {
- if (indexGenerateRowStats != null) {
- return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ?
indexGenerateRowStats.rhs : ImmutableMap.of());
- } else if (!currentRunner.getName().equals("partial segment generation")) {
- return Pair.of(ImmutableMap.of(), ImmutableMap.of());
- } else {
- Map<String, GeneratedPartitionsReport> completedSubtaskReports =
- (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
- final SimpleRowIngestionMeters buildSegmentsRowStats = new
SimpleRowIngestionMeters();
- final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
- for (GeneratedPartitionsReport generatedPartitionsReport :
completedSubtaskReports.values()) {
- Map<String, TaskReport> taskReport =
generatedPartitionsReport.getTaskReport();
- if (taskReport == null || taskReport.isEmpty()) {
- LOG.warn("Got an empty task report from subtask: " +
generatedPartitionsReport.getTaskId());
- continue;
- }
- RowIngestionMetersTotals rowStatsForCompletedTask =
- getBuildSegmentsStatsFromTaskReport(taskReport, true,
unparseableEvents);
-
-
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
- }
-
- RowIngestionMetersTotals rowStatsForRunningTasks =
getRowStatsAndUnparseableEventsForRunningTasks(
- currentRunner.getRunningTaskIds(),
- unparseableEvents,
- includeUnparseable
- );
-
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
- return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(),
unparseableEvents);
- }
- }
-
- private RowIngestionMetersTotals
getRowStatsAndUnparseableEventsForRunningTasks(
- Set<String> runningTaskIds,
- List<ParseExceptionReport> unparseableEvents,
- boolean includeUnparseable
- )
+ @Nullable
+ Map<String, Object> fetchTaskReport(String taskId)
{
- final SimpleRowIngestionMeters buildSegmentsRowStats = new
SimpleRowIngestionMeters();
- for (String runningTaskId : runningTaskIds) {
- try {
- final Map<String, Object> report =
getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
- if (report == null || report.isEmpty()) {
- // task does not have a running report yet
- continue;
- }
-
- Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>)
report.get("ingestionStatsAndErrors");
- Map<String, Object> payload = (Map<String, Object>)
ingestionStatsAndErrors.get("payload");
- Map<String, Object> rowStats = (Map<String, Object>)
payload.get("rowStats");
- Map<String, Object> totals = (Map<String, Object>)
rowStats.get("totals");
- Map<String, Object> buildSegments = (Map<String, Object>)
totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
- if (includeUnparseable) {
- Map<String, Object> taskUnparseableEvents = (Map<String, Object>)
payload.get("unparseableEvents");
- List<ParseExceptionReport> buildSegmentsUnparseableEvents =
(List<ParseExceptionReport>)
- taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(buildSegmentsUnparseableEvents);
- }
-
-
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
- }
- catch (Exception e) {
- LOG.warn(e, "Encountered exception when getting live subtask report
for task: " + runningTaskId);
- }
+ try {
+ return getTaskReport(toolbox.getOverlordClient(), taskId);
}
- return buildSegmentsRowStats.getTotals();
- }
-
- private Pair<Map<String, Object>, Map<String, Object>>
createStatsAndErrorsReport(
- RowIngestionMetersTotals rowStats,
- List<ParseExceptionReport> unparseableEvents
- )
- {
- Map<String, Object> rowStatsMap = new HashMap<>();
- Map<String, Object> totalsMap = new HashMap<>();
- totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
- rowStatsMap.put("totals", totalsMap);
-
- return Pair.of(rowStatsMap,
ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
- }
-
- private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
- Map<String, TaskReport> taskReport,
- boolean includeUnparseable,
- List<ParseExceptionReport> unparseableEvents)
- {
- IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
- IngestionStatsAndErrorsTaskReport.REPORT_KEY);
- IngestionStatsAndErrorsTaskReportData reportData =
- (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
- RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
- reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
- );
- if (includeUnparseable) {
- List<ParseExceptionReport> taskUnparsebleEvents =
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-
.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(taskUnparsebleEvents);
+ catch (Exception e) {
+ LOG.warn(e, "Encountered exception when getting live subtask report for
task: " + taskId);
}
- return totals;
+ return null;
}
- private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEvents(
- String full,
- boolean includeUnparseable
- )
+ private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full,
boolean includeUnparseable)
{
if (currentSubTaskHolder == null) {
- return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ return new ParallelIndexStats();
}
Object currentRunner = currentSubTaskHolder.getTask();
if (currentRunner == null) {
- return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ return new ParallelIndexStats();
}
- if (isParallelMode()) {
Review Comment:
I think we can retain this if-else flow (but with some clean up as you have
done).
- for sequential, we will just create the `ParallelIndexTaskStats` object
here itself.
- for single phase, we will create a `ParallelIndexStatsReporter` and do the
computations
- for multi phase, we will have a method in this class itself, which checks
if it is already cached or not, otherwise, it constructs a
`ParallelIndexStatsReporter`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]