jihoonson commented on a change in pull request #10336:
URL: https://github.com/apache/druid/pull/10336#discussion_r482313598
##########
File path:
core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
final DateTime timestamp;
try {
timestamp = timestampSpec.extractTimestamp(theMap);
- if (timestamp == null) {
- final String input = theMap.toString();
- throw new NullPointerException(
- StringUtils.format(
- "Null timestamp in input: %s",
- input.length() < 100 ? input : input.substring(0, 100) + "..."
- )
- );
- }
}
catch (Exception e) {
- throw new ParseException(e, "Unparseable timestamp found! Event: %s",
theMap);
+ throw new ParseException(e, "Unparseable timestamp found! Event: %s",
rawMapToPrint(theMap));
+ }
+ if (timestamp == null) {
+ throw new ParseException("Unparseable timestamp found! Event: %s",
rawMapToPrint(theMap));
Review comment:
It is a good idea to print the timestamp string, but I would like to
keep the current behavior as well (this logging is not what I added in this
PR). I modified the error message to include timestamp.
##########
File path:
core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
##########
@@ -93,23 +94,32 @@ public static InputRow parse(
final DateTime timestamp;
try {
timestamp = timestampSpec.extractTimestamp(theMap);
- if (timestamp == null) {
- final String input = theMap.toString();
- throw new NullPointerException(
- StringUtils.format(
- "Null timestamp in input: %s",
- input.length() < 100 ? input : input.substring(0, 100) + "..."
- )
- );
- }
}
catch (Exception e) {
- throw new ParseException(e, "Unparseable timestamp found! Event: %s",
theMap);
+ throw new ParseException(e, "Unparseable timestamp found! Event: %s",
rawMapToPrint(theMap));
+ }
+ if (timestamp == null) {
+ throw new ParseException("Unparseable timestamp found! Event: %s",
rawMapToPrint(theMap));
+ }
+ if (!Intervals.ETERNITY.contains(timestamp)) {
+ throw new ParseException(
+ "Encountered row with timestamp that cannot be represented as a
long: [%s]",
+ rawMapToPrint(theMap)
+ );
Review comment:
Done.
##########
File path:
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -106,6 +103,9 @@
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
Review comment:
That might be nice. I think we probably need new classes for metrics of
native batch ingestion. I will reorganize the package in a follow-up PR if is
good.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -135,6 +148,49 @@ public void stopGracefully(TaskConfig taskConfig)
}
}
+ public static FilteringCloseableInputRowIterator inputSourceReader(
Review comment:
Added.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
##########
@@ -151,7 +151,7 @@ private static String
makeTaskId(RealtimeAppenderatorIngestionSpec spec)
private volatile Thread runThread = null;
@JsonIgnore
- private CircularBuffer<Throwable> savedParseExceptions;
+ private ParseExceptionHandler parseExceptionHandler;
Review comment:
Added.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]