This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 61120dc49a fix Kafka input format to throw ParseException if timestamp 
is missing (#14413)
61120dc49a is described below

commit 61120dc49a2c7a94f7421c9cbf68bfe88844131f
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jun 13 09:00:11 2023 -0700

    fix Kafka input format to throw ParseException if timestamp is missing 
(#14413)
---
 .../data/input/kafkainput/KafkaInputReader.java    |  5 +-
 .../input/kafkainput/KafkaInputFormatTest.java     | 63 ++++++++++++++++++++++
 .../druid/data/input/impl/MapInputRowParser.java   |  8 ++-
 3 files changed, 74 insertions(+), 2 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
index 6e6ac8fa10..0f3cefb4c3 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
@@ -26,12 +26,14 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.kafka.KafkaRecordEntity;
 import org.apache.druid.indexing.seekablestream.SettableByteEntity;
 import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
+import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -195,8 +197,9 @@ public class KafkaInputReader implements InputEntityReader
           // Remove the dummy timestamp added in KafkaInputFormat
           newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
 
+          final DateTime timestamp = 
MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event);
           return new MapBasedInputRow(
-              inputRowSchema.getTimestampSpec().extractTimestamp(event),
+              timestamp,
               getFinalDimensionList(newDimensions),
               event
           );
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
index a45730005a..fc33852d54 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
@@ -39,6 +39,7 @@ import 
org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
@@ -554,6 +555,68 @@ public class KafkaInputFormatTest
     }
   }
 
+  @Test
+  public void testMissingTimestampThrowsException() throws IOException
+  {
+    final byte[] key = StringUtils.toUtf8(
+        "{\n"
+        + "    \"key\": \"sampleKey\"\n"
+        + "}");
+
+    final byte[] payload = StringUtils.toUtf8(
+        "{\n"
+        + "    \"timestamp\": \"2021-06-25\",\n"
+        + "    \"bar\": null,\n"
+        + "    \"foo\": \"x\",\n"
+        + "    \"baz\": 4,\n"
+        + "    \"o\": {\n"
+        + "        \"mg\": 1\n"
+        + "    }\n"
+        + "}");
+
+    Headers headers = new RecordHeaders(SAMPLE_HEADERS);
+    inputEntity = new KafkaRecordEntity(
+        new ConsumerRecord<>(
+        "sample",
+        0,
+        0,
+        timestamp,
+        null,
+        null,
+        0,
+        0,
+        key,
+        payload,
+        headers
+        )
+    );
+
+    final InputEntityReader reader = format.createReader(
+        new InputRowSchema(
+            new TimestampSpec("time", "iso", null),
+            new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
+                "bar", "foo",
+                "kafka.newheader.encoding",
+                "kafka.newheader.kafkapkc",
+                "kafka.newts.timestamp"
+            ))),
+            ColumnsFilter.all()
+        ),
+        newSettableByteEntity(inputEntity),
+        null
+    );
+
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      while (iterator.hasNext()) {
+        Throwable t = Assert.assertThrows(ParseException.class, () -> 
iterator.next());
+        Assert.assertEquals(
+            "Timestamp[null] is unparseable! Event: {foo=x, 
kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4, 
bar=null, kafka...",
+            t.getMessage()
+        );
+      }
+    }
+  }
+
   private SettableByteEntity<KafkaRecordEntity> 
newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity)
   {
     SettableByteEntity<KafkaRecordEntity> settableByteEntity = new 
SettableByteEntity<>();
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
index c069e3e37c..ca2dddecfd 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
@@ -122,6 +122,12 @@ public class MapInputRowParser implements 
InputRowParser<Map<String, Object>>
   {
     final List<String> dimensionsToUse = findDimensions(timestampSpec, 
dimensionsSpec, theMap);
 
+    final DateTime timestamp = parseTimestamp(timestampSpec, theMap);
+    return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
+  }
+
+  public static DateTime parseTimestamp(TimestampSpec timestampSpec, 
Map<String, Object> theMap)
+  {
     final DateTime timestamp;
     try {
       timestamp = timestampSpec.extractTimestamp(theMap);
@@ -154,7 +160,7 @@ public class MapInputRowParser implements 
InputRowParser<Map<String, Object>>
           rawMap
       );
     }
-    return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
+    return timestamp;
   }
 
   @Nullable


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to