paul-rogers commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1115163884


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -534,7 +536,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
     catch (Exception e) {
       log.error(e, "Encountered exception in %s.", ingestionState);
       errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getId(), 
getTaskCompletionReports());
+      toolbox.getTaskReportFileWriter().write(getId(), 
getTaskCompletionReports(Collections.emptyList()));

Review Comment:
   Given the number of times that this empty list pattern occurs, should we 
just define an overload of `getTaskCompletionReports()` that takes no arguments 
and calls the one-argument version with an empty list?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new 
Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) 
reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object 
buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats 
will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) 
buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) 
buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) 
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
+      );
+    } else {
+      // should never happen
+      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + 
buildSegmentsRowStats.getClass());
+    }
+  }
+
+  protected RowIngestionMetersTotals 
getRowStatsAndUnparseableEventsForRunningTasks(
+      ParallelIndexSupervisorTask task,
+      Set<String> runningTaskIds,
+      List<ParseExceptionReport> unparseableEvents,
+      boolean includeUnparseable
+  )
+  {
+    final SimpleRowIngestionMeters buildSegmentsRowStats = new 
SimpleRowIngestionMeters();
+    for (String runningTaskId : runningTaskIds) {
+      try {
+        final Map<String, Object> report = task.fetchTaskReport(runningTaskId);
+        if (report == null || report.isEmpty()) {
+          // task does not have a running report yet
+          continue;
+        }
+
+        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) 
report.get("ingestionStatsAndErrors");
+        Map<String, Object> payload = (Map<String, Object>) 
ingestionStatsAndErrors.get("payload");
+        Map<String, Object> rowStats = (Map<String, Object>) 
payload.get("rowStats");
+        Map<String, Object> totals = (Map<String, Object>) 
rowStats.get("totals");

Review Comment:
   More constants?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new 
Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) 
reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object 
buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats 
will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) 
buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) 
buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) 
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()

Review Comment:
   Can we define constants for these names?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends 
ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new 
Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, 
?>) runner;
+    if (!currentRunner.getName().equals("partial segment generation")) {

Review Comment:
   Better, should we declare a constant and use it where we first set the name 
and here? Will save grief later if someone decides to change the text.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new 
Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) 
reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object 
buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats 
will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) 
buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) 
buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) 
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
+      );
+    } else {
+      // should never happen
+      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + 
buildSegmentsRowStats.getClass());

Review Comment:
   Nit: 
   
   ```java
       throw ISE(
             "Unrecognized buildSegmentsRowStats type: [%s]",
             buildSegmentsRowStats.getClass().getName()
       );
   ````
   
   ISE = Illegal State Exception
   
   Has a formatter built in. Note Druid likes its brackets for interpolated 
values.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends 
ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new 
Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, 
?>) runner;
+    if (!currentRunner.getName().equals("partial segment generation")) {

Review Comment:
   Druid kinda likes to reverse these comparisons for null protection:
   
   ```java
     if (!"constant".equals(variable)) {
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -767,7 +770,8 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox 
toolbox) throws Except
       );
       return TaskStatus.failure(getId(), errMsg);
     }
-    indexGenerateRowStats = 
doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
+
+    indexGenerateRowStats = new 
MultiPhaseParallelIndexStatsReporter().report(this, indexingRunner, true, 
"full");

Review Comment:
   Yet another constant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to