This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 61120dc49a fix Kafka input format to throw ParseException if timestamp
is missing (#14413)
61120dc49a is described below
commit 61120dc49a2c7a94f7421c9cbf68bfe88844131f
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jun 13 09:00:11 2023 -0700
fix Kafka input format to throw ParseException if timestamp is missing
(#14413)
---
.../data/input/kafkainput/KafkaInputReader.java | 5 +-
.../input/kafkainput/KafkaInputFormatTest.java | 63 ++++++++++++++++++++++
.../druid/data/input/impl/MapInputRowParser.java | 8 ++-
3 files changed, 74 insertions(+), 2 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
index 6e6ac8fa10..0f3cefb4c3 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
@@ -26,12 +26,14 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
+import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -195,8 +197,9 @@ public class KafkaInputReader implements InputEntityReader
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
+ final DateTime timestamp =
MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event);
return new MapBasedInputRow(
- inputRowSchema.getTimestampSpec().extractTimestamp(event),
+ timestamp,
getFinalDimensionList(newDimensions),
event
);
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
index a45730005a..fc33852d54 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
@@ -39,6 +39,7 @@ import
org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
@@ -554,6 +555,68 @@ public class KafkaInputFormatTest
}
}
+ @Test
+ public void testMissingTimestampThrowsException() throws IOException
+ {
+ final byte[] key = StringUtils.toUtf8(
+ "{\n"
+ + " \"key\": \"sampleKey\"\n"
+ + "}");
+
+ final byte[] payload = StringUtils.toUtf8(
+ "{\n"
+ + " \"timestamp\": \"2021-06-25\",\n"
+ + " \"bar\": null,\n"
+ + " \"foo\": \"x\",\n"
+ + " \"baz\": 4,\n"
+ + " \"o\": {\n"
+ + " \"mg\": 1\n"
+ + " }\n"
+ + "}");
+
+ Headers headers = new RecordHeaders(SAMPLE_HEADERS);
+ inputEntity = new KafkaRecordEntity(
+ new ConsumerRecord<>(
+ "sample",
+ 0,
+ 0,
+ timestamp,
+ null,
+ null,
+ 0,
+ 0,
+ key,
+ payload,
+ headers
+ )
+ );
+
+ final InputEntityReader reader = format.createReader(
+ new InputRowSchema(
+ new TimestampSpec("time", "iso", null),
+ new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
+ "bar", "foo",
+ "kafka.newheader.encoding",
+ "kafka.newheader.kafkapkc",
+ "kafka.newts.timestamp"
+ ))),
+ ColumnsFilter.all()
+ ),
+ newSettableByteEntity(inputEntity),
+ null
+ );
+
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ while (iterator.hasNext()) {
+ Throwable t = Assert.assertThrows(ParseException.class, () ->
iterator.next());
+ Assert.assertEquals(
+ "Timestamp[null] is unparseable! Event: {foo=x,
kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4,
bar=null, kafka...",
+ t.getMessage()
+ );
+ }
+ }
+ }
+
private SettableByteEntity<KafkaRecordEntity>
newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity)
{
SettableByteEntity<KafkaRecordEntity> settableByteEntity = new
SettableByteEntity<>();
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
index c069e3e37c..ca2dddecfd 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
@@ -122,6 +122,12 @@ public class MapInputRowParser implements
InputRowParser<Map<String, Object>>
{
final List<String> dimensionsToUse = findDimensions(timestampSpec,
dimensionsSpec, theMap);
+ final DateTime timestamp = parseTimestamp(timestampSpec, theMap);
+ return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
+ }
+
+ public static DateTime parseTimestamp(TimestampSpec timestampSpec,
Map<String, Object> theMap)
+ {
final DateTime timestamp;
try {
timestamp = timestampSpec.extractTimestamp(theMap);
@@ -154,7 +160,7 @@ public class MapInputRowParser implements
InputRowParser<Map<String, Object>>
rawMap
);
}
- return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
+ return timestamp;
}
@Nullable
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]