kfaraz commented on a change in pull request #12280:
URL: https://github.com/apache/druid/pull/12280#discussion_r816589224
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1492,31 +1496,57 @@ private RowIngestionMetersTotals
getTotalsFromBuildSegmentsRowStats(Object build
LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
continue;
}
- IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
- IngestionStatsAndErrorsTaskReport.REPORT_KEY);
- IngestionStatsAndErrorsTaskReportData reportData =
- (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
- RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
- reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
- );
-
- if (includeUnparseable) {
- List<ParseExceptionReport> taskUnparsebleEvents =
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-
.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(taskUnparsebleEvents);
- }
+ RowIngestionMetersTotals rowIngestionMetersTotals =
+ getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable,
unparseableEvents);
- processed += totals.getProcessed();
- processedWithError += totals.getProcessedWithError();
- thrownAway += totals.getThrownAway();
- unparseable += totals.getUnparseable();
+
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}
// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
+
+ return getRowStatsAndUnparseableEventsForRunningTasks(runningTaskIds,
unparseableEvents, buildSegmentsRowStats, includeUnparseable);
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEventsParallelMultiPhase(boolean includeUnparseable)
+ {
+ if (indexGenerateRowStats != null) {
+ return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ?
indexGenerateRowStats.rhs : ImmutableMap.of());
+ } else if (currentSubTaskHolder == null || !((ParallelIndexTaskRunner)
currentSubTaskHolder.getTask()).getName().equals("partial segment generation"))
{
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ } else {
+ ParallelIndexTaskRunner currentRunner = currentSubTaskHolder.getTask();
+ Set<String> runningTaskIds = currentRunner.getRunningTaskIds();
+ Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
+
+ final MutableRowIngestionMeters buildSegmentsRowStats = new
MutableRowIngestionMeters();
+
+ List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
Review comment:
Nit:
```suggestion
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1492,31 +1496,57 @@ private RowIngestionMetersTotals
getTotalsFromBuildSegmentsRowStats(Object build
LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
continue;
}
- IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
- IngestionStatsAndErrorsTaskReport.REPORT_KEY);
- IngestionStatsAndErrorsTaskReportData reportData =
- (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
- RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
- reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
- );
-
- if (includeUnparseable) {
- List<ParseExceptionReport> taskUnparsebleEvents =
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-
.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(taskUnparsebleEvents);
- }
+ RowIngestionMetersTotals rowIngestionMetersTotals =
+ getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable,
unparseableEvents);
- processed += totals.getProcessed();
- processedWithError += totals.getProcessedWithError();
- thrownAway += totals.getThrownAway();
- unparseable += totals.getUnparseable();
+
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}
// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
+
+ return getRowStatsAndUnparseableEventsForRunningTasks(runningTaskIds,
unparseableEvents, buildSegmentsRowStats, includeUnparseable);
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEventsParallelMultiPhase(boolean includeUnparseable)
+ {
+ if (indexGenerateRowStats != null) {
+ return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ?
indexGenerateRowStats.rhs : ImmutableMap.of());
+ } else if (currentSubTaskHolder == null || !((ParallelIndexTaskRunner)
currentSubTaskHolder.getTask()).getName().equals("partial segment generation"))
{
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ } else {
+ ParallelIndexTaskRunner currentRunner = currentSubTaskHolder.getTask();
+ Set<String> runningTaskIds = currentRunner.getRunningTaskIds();
Review comment:
Nit:
This field `runningTaskIds` is not accessed until the last return statement.
Please declare it close to that statement, or just use
`currentRunner.getRunningTaskIds()` as the argument.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1492,31 +1496,57 @@ private RowIngestionMetersTotals
getTotalsFromBuildSegmentsRowStats(Object build
LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
continue;
}
- IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
- IngestionStatsAndErrorsTaskReport.REPORT_KEY);
- IngestionStatsAndErrorsTaskReportData reportData =
- (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
- RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
- reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
- );
-
- if (includeUnparseable) {
- List<ParseExceptionReport> taskUnparsebleEvents =
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-
.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(taskUnparsebleEvents);
- }
+ RowIngestionMetersTotals rowIngestionMetersTotals =
+ getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable,
unparseableEvents);
- processed += totals.getProcessed();
- processedWithError += totals.getProcessedWithError();
- thrownAway += totals.getThrownAway();
- unparseable += totals.getUnparseable();
+
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}
// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
+
+ return getRowStatsAndUnparseableEventsForRunningTasks(runningTaskIds,
unparseableEvents, buildSegmentsRowStats, includeUnparseable);
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEventsParallelMultiPhase(boolean includeUnparseable)
+ {
+ if (indexGenerateRowStats != null) {
+ return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ?
indexGenerateRowStats.rhs : ImmutableMap.of());
+ } else if (currentSubTaskHolder == null || !((ParallelIndexTaskRunner)
currentSubTaskHolder.getTask()).getName().equals("partial segment generation"))
{
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ } else {
+ ParallelIndexTaskRunner currentRunner = currentSubTaskHolder.getTask();
+ Set<String> runningTaskIds = currentRunner.getRunningTaskIds();
+ Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
+
+ final MutableRowIngestionMeters buildSegmentsRowStats = new
MutableRowIngestionMeters();
+
Review comment:
Nit: extra newline does not seem necessary here.
```suggestion
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1492,31 +1496,57 @@ private RowIngestionMetersTotals
getTotalsFromBuildSegmentsRowStats(Object build
LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
continue;
}
- IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
- IngestionStatsAndErrorsTaskReport.REPORT_KEY);
- IngestionStatsAndErrorsTaskReportData reportData =
- (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
- RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
- reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
- );
-
- if (includeUnparseable) {
- List<ParseExceptionReport> taskUnparsebleEvents =
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-
.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(taskUnparsebleEvents);
- }
+ RowIngestionMetersTotals rowIngestionMetersTotals =
+ getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable,
unparseableEvents);
- processed += totals.getProcessed();
- processedWithError += totals.getProcessedWithError();
- thrownAway += totals.getThrownAway();
- unparseable += totals.getUnparseable();
+
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}
// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
+
+ return getRowStatsAndUnparseableEventsForRunningTasks(runningTaskIds,
unparseableEvents, buildSegmentsRowStats, includeUnparseable);
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEventsParallelMultiPhase(boolean includeUnparseable)
+ {
+ if (indexGenerateRowStats != null) {
+ return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ?
indexGenerateRowStats.rhs : ImmutableMap.of());
+ } else if (currentSubTaskHolder == null || !((ParallelIndexTaskRunner)
currentSubTaskHolder.getTask()).getName().equals("partial segment generation"))
{
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ } else {
+ ParallelIndexTaskRunner currentRunner = currentSubTaskHolder.getTask();
+ Set<String> runningTaskIds = currentRunner.getRunningTaskIds();
+ Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
+
+ final MutableRowIngestionMeters buildSegmentsRowStats = new
MutableRowIngestionMeters();
+
+ List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
+ for (GeneratedPartitionsReport generatedPartitionsReport :
completedSubtaskReports.values()) {
+ Map<String, TaskReport> taskReport =
generatedPartitionsReport.getTaskReport();
+ if (taskReport == null || taskReport.isEmpty()) {
+ LOG.warn("Got an empty task report from subtask: " +
generatedPartitionsReport.getTaskId());
+ continue;
+ }
+ RowIngestionMetersTotals rowStats =
+ getBuildSegmentsStatsFromTaskReport(taskReport, true,
unparseableEvents);
+
+ buildSegmentsRowStats.addRowIngestionMetersTotals(rowStats);
+ }
+ return getRowStatsAndUnparseableEventsForRunningTasks(runningTaskIds,
unparseableEvents, buildSegmentsRowStats, includeUnparseable);
+ }
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
getRowStatsAndUnparseableEventsForRunningTasks(
Review comment:
I feel it would be cleaner if this method just returned a
`RowIngestionMetersTotals`.
We can have another method which accepts a List of unparseable events and a
`MutableRowIngestionMeters` and creates a `Pair<Map<String, Object>,
Map<String, Object>>`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]