suneet-s commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482150156



##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", 
theMap);
+      throw new ParseException(e, "Unparseable timestamp found! Event: %s", 
rawMapToPrint(theMap));
+    }
+    if (timestamp == null) {
+      throw new ParseException("Unparseable timestamp found! Event: %s", 
rawMapToPrint(theMap));
+    }
+    if (!Intervals.ETERNITY.contains(timestamp)) {

Review comment:
       wow, I never thought of this case

##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", 
theMap);
+      throw new ParseException(e, "Unparseable timestamp found! Event: %s", 
rawMapToPrint(theMap));
+    }
+    if (timestamp == null) {
+      throw new ParseException("Unparseable timestamp found! Event: %s", 
rawMapToPrint(theMap));

Review comment:
       Instead of printing the whole map, I think it would be better to just 
print the timestamp string that was unparsable.
   Similar comment on line 99, this would get rid of the need of 
`rawMapToPrint` and guarantees that the invalid timestamp is always logged 
regardless of where in the event it is.
   
   ```suggestion
         throw new ParseException("Unparseable timestamp found! Timestamp 
column (%s): %s", timestampSpec.getTimestampColumn(), 
theMap.get(timestampSpec.getTimestampColumn()));
   ```

##########
File path: 
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -106,6 +103,9 @@
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;

Review comment:
       nit: Maybe these 3 classes should be moved into their own sub package 
`org.apache.druid.segment.incremental.stats`

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -135,6 +148,49 @@ public void stopGracefully(TaskConfig taskConfig)
     }
   }
 
+  public static FilteringCloseableInputRowIterator inputSourceReader(
+      File tmpDir,
+      DataSchema dataSchema,
+      InputSource inputSource,
+      @Nullable InputFormat inputFormat,
+      Predicate<InputRow> rowFilter,
+      RowIngestionMeters ingestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  ) throws IOException
+  {
+    final List<String> metricsNames = 
Arrays.stream(dataSchema.getAggregators())
+                                            .map(AggregatorFactory::getName)
+                                            .collect(Collectors.toList());
+    final InputSourceReader inputSourceReader = 
dataSchema.getTransformSpec().decorate(
+        inputSource.reader(
+            new InputRowSchema(
+                dataSchema.getTimestampSpec(),
+                dataSchema.getDimensionsSpec(),
+                metricsNames
+            ),

Review comment:
       This is calculated in 3 places in the code - `AbstractBatchIndexTask`, 
`InputSourceSampler` and `SeekableStreamIndexTaskRunner`. I think it would be a 
good idea to consolidate these methods, so that metricNames is always 
calculated the same way given a dataSchema

##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
-      if (timestamp == null) {
-        final String input = theMap.toString();
-        throw new NullPointerException(
-            StringUtils.format(
-                "Null timestamp in input: %s",
-                input.length() < 100 ? input : input.substring(0, 100) + "..."
-            )
-        );
-      }
     }
     catch (Exception e) {
-      throw new ParseException(e, "Unparseable timestamp found! Event: %s", 
theMap);
+      throw new ParseException(e, "Unparseable timestamp found! Event: %s", 
rawMapToPrint(theMap));
+    }
+    if (timestamp == null) {
+      throw new ParseException("Unparseable timestamp found! Event: %s", 
rawMapToPrint(theMap));
+    }
+    if (!Intervals.ETERNITY.contains(timestamp)) {
+      throw new ParseException(
+          "Encountered row with timestamp that cannot be represented as a 
long: [%s]",
+          rawMapToPrint(theMap)
+      );

Review comment:
       After your refactoring in this change, the `parse` functions on line 65 
and 70 can be made private and package private (VisibleForTesting) respectively

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -135,6 +148,49 @@ public void stopGracefully(TaskConfig taskConfig)
     }
   }
 
+  public static FilteringCloseableInputRowIterator inputSourceReader(

Review comment:
       javadocs please

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
##########
@@ -151,7 +151,7 @@ private static String 
makeTaskId(RealtimeAppenderatorIngestionSpec spec)
   private volatile Thread runThread = null;
 
   @JsonIgnore
-  private CircularBuffer<Throwable> savedParseExceptions;
+  private ParseExceptionHandler parseExceptionHandler;

Review comment:
       Should this also be annotated with `@MonotonicNonNull`

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
##########
@@ -550,7 +546,9 @@ public Response getUnparseableEvents(
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-    List<String> events = 
IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
+    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
+        parseExceptionHandler.getSavedParseExceptions()

Review comment:
       parseExceptionHandler can be null if the `task.run()` hasn't been 
called. This is probably unlikely?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to