kfaraz commented on code in PR #15947:
URL: https://github.com/apache/druid/pull/15947#discussion_r1513743509


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java:
##########
@@ -113,6 +123,22 @@ public Map<String, Long> getRecordsProcessed()
     return recordsProcessed;
   }
 
+  @JsonProperty
+  @Nullable

Review Comment:
   Optional: Maybe also mark these new fields as `Nullable` in the constructor.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -125,9 +126,13 @@ public final TaskStatus runTask(TaskToolbox toolbox) 
throws Exception
         toolbox.getIndexingTmpDir()
     );
 
-    Map<String, TaskReport> taskReport = getTaskCompletionReports();
+    Map<String, TaskReport> taskReport = 
getTaskCompletionReports(getSegementsSize(inputSource));
 
-    taskClient.report(createGeneratedPartitionsReport(toolbox, segments, 
taskReport));
+    taskClient.report(createGeneratedPartitionsReport(
+        toolbox,
+        segments,
+        taskReport
+    ));

Review Comment:
   Original code was more readable. If you do want to break the line however, 
this style would be better.
   
   ```suggestion
       taskClient.report(
           createGeneratedPartitionsReport(toolbox, segments, taskReport)
       );
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1643,6 +1660,13 @@ private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseab
             getBuildSegmentsStatsFromTaskReport(taskReport, true, 
unparseableEvents);
 
         
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
+
+        Long segmentsRead = ((IngestionStatsAndErrorsTaskReport)

Review Comment:
   Rename to `segmentsReadFromPartition` to keep CodeQL from complaining.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -149,6 +154,15 @@ abstract T createGeneratedPartitionsReport(
       Map<String, TaskReport> taskReport
   );
 
+  private Long getSegementsSize(InputSource inputSource)
+  {
+    if (inputSource instanceof DruidInputSource) {
+      return (long) ((DruidInputSource) inputSource).getSegmentIds().size();

Review Comment:
   I guess after the `DruidInputSource` has been split, the `segmentIds` would 
always be non-null. But it is probably still better to add a null check here 
just in case.



##########
integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java:
##########
@@ -138,6 +140,54 @@ public void 
testCompactionWithQueryGranularityInGranularitySpec() throws Excepti
     }
   }
 
+  @Test
+  public void testParallelHashedCompaction() throws Exception

Review Comment:
   Thanks for adding the IT.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -149,6 +154,15 @@ abstract T createGeneratedPartitionsReport(
       Map<String, TaskReport> taskReport
   );
 
+  private Long getSegementsSize(InputSource inputSource)

Review Comment:
   ```suggestion
     private Long getNumSegmentsRead(InputSource inputSource)
   ```



##########
docs/ingestion/tasks.md:
##########
@@ -155,6 +155,12 @@ For some task types, the indexing task can wait for the 
newly ingested segments
 |`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for 
the newly ingested segments to be available for query after completing 
ingestion was completed.|
 |`recordsProcessed`| Partitions that were processed by an ingestion task and 
includes count of records processed from each partition.|
 
+
+|Field| Description|

Review Comment:
   Please add a heading for this section and explain when these stats are 
populated.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -650,7 +652,17 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox 
toolbox) throws Exception
     TaskStatus taskStatus;
     if (state.isSuccess()) {
       //noinspection ConstantConditions
-      publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
+      segmentsPublished = publishSegments(toolbox, 
parallelSinglePhaseRunner.getReports());
+      if (isCompactionTask) {
+        // segements are only read for compactiont tasks. For `index_parallel`
+        // tasks this would result to 0, but we want to rather have it as null
+        // because segmentsRead is not applicable for such tasks.

Review Comment:
   We can simplify this comment:
   
   ```suggestion
           // Populate segmentsRead only for compaction tasks
   ```
   
   Also, it might not be entire true as segments are also read during a 
re-index task which still qualifies as `index_parallel`.
   
   In the future, you could even consider:
   1. Getting rid of the if condition altogether. The only drawback would be 
that in batch ingestion jobs where the input source is not Druid, we would 
report the `segmentsRead` as `0` instead of `null`.
   OR
   2. Populating `segmentsRead` if the input source is `DruidInputSource`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1200,6 +1212,8 @@ private void publishSegments(
     } else {
       throw new ISE("Failed to publish segments");
     }
+
+    return newSegments.size();

Review Comment:
   Nit: If the result of `publishSegments` is always assigned to 
`segmentsPublished`, we might as well just assign it here.
   
   ```suggestion
       segmentsPublished = newSegments.size();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to