kfaraz commented on a change in pull request #12280:
URL: https://github.com/apache/druid/pull/12280#discussion_r816511853
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1535,27 +1571,47 @@ private RowIngestionMetersTotals
getTotalsFromBuildSegmentsRowStats(Object build
unparseableEvents.addAll(buildSegmentsUnparseableEvents);
}
- processed += ((Number) buildSegments.get("processed")).longValue();
- processedWithError += ((Number)
buildSegments.get("processedWithError")).longValue();
- thrownAway += ((Number) buildSegments.get("thrownAway")).longValue();
- unparseable += ((Number) buildSegments.get("unparseable")).longValue();
+ RowIngestionMetersTotals runningTaskBuildSegmentsRowStats = new
RowIngestionMetersTotals(((Number) buildSegments.get("processed")).longValue(),
+
((Number) buildSegments.get("processedWithError")).longValue(),
+
((Number) buildSegments.get("thrownAway")).longValue(),
+
((Number) buildSegments.get("unparseable")).longValue());
+
buildSegmentsRowStats.addRowIngestionMetersTotals(runningTaskBuildSegmentsRowStats);
Review comment:
```suggestion
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1477,10 +1484,7 @@ private RowIngestionMetersTotals
getTotalsFromBuildSegmentsRowStats(Object build
boolean includeUnparseable
)
{
- long processed = 0L;
- long processedWithError = 0L;
- long thrownAway = 0L;
- long unparseable = 0L;
+ MutableRowIngestionMeters buildSegmentsRowStats = new
MutableRowIngestionMeters();
Review comment:
Nit:
```suggestion
final MutableRowIngestionMeters buildSegmentsRowStats = new
MutableRowIngestionMeters();
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1492,31 +1496,63 @@ private RowIngestionMetersTotals
getTotalsFromBuildSegmentsRowStats(Object build
LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
continue;
}
- IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
- IngestionStatsAndErrorsTaskReport.REPORT_KEY);
- IngestionStatsAndErrorsTaskReportData reportData =
- (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
- RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
- reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
- );
-
- if (includeUnparseable) {
- List<ParseExceptionReport> taskUnparsebleEvents =
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-
.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(taskUnparsebleEvents);
- }
+ RowIngestionMetersTotals rowStatsAndUnparseableEvents =
Review comment:
```suggestion
RowIngestionMetersTotals rowIngestionMetersTotals =
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1492,31 +1496,63 @@ private RowIngestionMetersTotals
getTotalsFromBuildSegmentsRowStats(Object build
LOG.warn("Got an empty task report from subtask: " +
pushedSegmentsReport.getTaskId());
continue;
}
- IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
(IngestionStatsAndErrorsTaskReport) taskReport.get(
- IngestionStatsAndErrorsTaskReport.REPORT_KEY);
- IngestionStatsAndErrorsTaskReportData reportData =
- (IngestionStatsAndErrorsTaskReportData)
ingestionStatsAndErrorsReport.getPayload();
- RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
- reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
- );
-
- if (includeUnparseable) {
- List<ParseExceptionReport> taskUnparsebleEvents =
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-
.get(RowIngestionMeters.BUILD_SEGMENTS);
- unparseableEvents.addAll(taskUnparsebleEvents);
- }
+ RowIngestionMetersTotals rowStatsAndUnparseableEvents =
+ getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable,
unparseableEvents);
- processed += totals.getProcessed();
- processedWithError += totals.getProcessedWithError();
- thrownAway += totals.getThrownAway();
- unparseable += totals.getUnparseable();
+
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsAndUnparseableEvents);
}
// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
+
+ return
getRunningTaskReportsAndCreateRowStatsAndUnparseableEvents(runningTaskIds,
+
unparseableEvents,
+
buildSegmentsRowStats,
+
includeUnparseable);
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
doGetRowStatsAndUnparseableEventsParallelMultiPhase(boolean includeUnparseable)
+ {
+ if (indexGenerateRowStats != null) {
+ return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ?
indexGenerateRowStats.rhs : ImmutableMap.of());
+ } else if (currentSubTaskHolder == null || !((ParallelIndexTaskRunner)
currentSubTaskHolder.getTask()).getName().equals("partial segment generation"))
{
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ } else {
+ ParallelIndexTaskRunner currentRunner = currentSubTaskHolder.getTask();
+ Set<String> runningTaskIds = currentRunner.getRunningTaskIds();
+ Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
+
+ MutableRowIngestionMeters buildSegmentsRowStats = new
MutableRowIngestionMeters();
+
+ List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
+ for (GeneratedPartitionsReport generatedPartitionsReport :
completedSubtaskReports.values()) {
+ Map<String, TaskReport> taskReport =
generatedPartitionsReport.getTaskReport();
+ if (taskReport == null || taskReport.isEmpty()) {
+ LOG.warn("Got an empty task report from subtask: " +
generatedPartitionsReport.getTaskId());
+ continue;
+ }
+ RowIngestionMetersTotals rowStatsAndUnparseableEvents =
+ getBuildSegmentsStatsFromTaskReport(taskReport, true,
unparseableEvents);
+
+
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsAndUnparseableEvents);
+ }
+ return
getRunningTaskReportsAndCreateRowStatsAndUnparseableEvents(runningTaskIds,
+
unparseableEvents,
+
buildSegmentsRowStats,
+
includeUnparseable);
+ }
+ }
+
+ private Pair<Map<String, Object>, Map<String, Object>>
getRunningTaskReportsAndCreateRowStatsAndUnparseableEvents(
Review comment:
```suggestion
private Pair<Map<String, Object>, Map<String, Object>>
getRowStatsAndUnparseableEventsForRunningTasks(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]