This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a38b4f0491 Add topic name as a column in the Kafka Input format 
(#14857)
a38b4f0491 is described below

commit a38b4f04919c04bef2aaa9d6ffd4bcb3a653653a
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Mon Aug 21 21:32:34 2023 +0530

    Add topic name as a column in the Kafka Input format (#14857)
    
    This PR adds a way to store the topic name in a column. Such a column can 
be used to distinguish messages coming from different topics in multi-topic 
ingestion.
---
 .../data/input/kafkainput/KafkaInputFormat.java    | 21 +++++++--
 .../data/input/kafkainput/KafkaInputReader.java    |  8 +++-
 .../input/kafkainput/KafkaInputFormatTest.java     | 50 +++++++++++++++++-----
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  2 +-
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |  1 +
 5 files changed, 65 insertions(+), 17 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
index 8f86ff07de..1299667057 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
@@ -40,6 +40,7 @@ public class KafkaInputFormat implements InputFormat
 {
   private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header.";
   private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = 
"kafka.timestamp";
+  private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic";
   private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key";
   public static final String DEFAULT_AUTO_TIMESTAMP_STRING = 
"__kif_auto_timestamp";
 
@@ -54,6 +55,7 @@ public class KafkaInputFormat implements InputFormat
   private final String headerColumnPrefix;
   private final String keyColumnName;
   private final String timestampColumnName;
+  private final String topicColumnName;
 
   public KafkaInputFormat(
       @JsonProperty("headerFormat") @Nullable KafkaHeaderFormat headerFormat,
@@ -61,7 +63,8 @@ public class KafkaInputFormat implements InputFormat
       @JsonProperty("valueFormat") InputFormat valueFormat,
       @JsonProperty("headerColumnPrefix") @Nullable String headerColumnPrefix,
       @JsonProperty("keyColumnName") @Nullable String keyColumnName,
-      @JsonProperty("timestampColumnName") @Nullable String timestampColumnName
+      @JsonProperty("timestampColumnName") @Nullable String 
timestampColumnName,
+      @JsonProperty("topicColumnName") @Nullable String topicColumnName
   )
   {
     this.headerFormat = headerFormat;
@@ -70,6 +73,7 @@ public class KafkaInputFormat implements InputFormat
     this.headerColumnPrefix = headerColumnPrefix != null ? headerColumnPrefix 
: DEFAULT_HEADER_COLUMN_PREFIX;
     this.keyColumnName = keyColumnName != null ? keyColumnName : 
DEFAULT_KEY_COLUMN_NAME;
     this.timestampColumnName = timestampColumnName != null ? 
timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME;
+    this.topicColumnName = topicColumnName != null ? topicColumnName : 
DEFAULT_TOPIC_COLUMN_NAME;
   }
 
   @Override
@@ -116,7 +120,8 @@ public class KafkaInputFormat implements InputFormat
             temporaryDirectory
         ),
         keyColumnName,
-        timestampColumnName
+        timestampColumnName,
+        topicColumnName
     );
   }
 
@@ -161,6 +166,13 @@ public class KafkaInputFormat implements InputFormat
     return timestampColumnName;
   }
 
+  @Nullable
+  @JsonProperty
+  public String getTopicColumnName()
+  {
+    return topicColumnName;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -176,14 +188,15 @@ public class KafkaInputFormat implements InputFormat
            && Objects.equals(keyFormat, that.keyFormat)
            && Objects.equals(headerColumnPrefix, that.headerColumnPrefix)
            && Objects.equals(keyColumnName, that.keyColumnName)
-           && Objects.equals(timestampColumnName, that.timestampColumnName);
+           && Objects.equals(timestampColumnName, that.timestampColumnName)
+           && Objects.equals(topicColumnName, that.topicColumnName);
   }
 
   @Override
   public int hashCode()
   {
     return Objects.hash(headerFormat, valueFormat, keyFormat,
-                        headerColumnPrefix, keyColumnName, timestampColumnName
+                        headerColumnPrefix, keyColumnName, 
timestampColumnName, topicColumnName
     );
   }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
index 6d43a2e96f..31b7cf66be 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
@@ -56,6 +56,7 @@ public class KafkaInputReader implements InputEntityReader
   private final InputEntityReader valueParser;
   private final String keyColumnName;
   private final String timestampColumnName;
+  private final String topicColumnName;
 
   /**
    *
@@ -74,7 +75,8 @@ public class KafkaInputReader implements InputEntityReader
       @Nullable Function<KafkaRecordEntity, InputEntityReader> 
keyParserSupplier,
       InputEntityReader valueParser,
       String keyColumnName,
-      String timestampColumnName
+      String timestampColumnName,
+      String topicColumnName
   )
   {
     this.inputRowSchema = inputRowSchema;
@@ -84,6 +86,7 @@ public class KafkaInputReader implements InputEntityReader
     this.valueParser = valueParser;
     this.keyColumnName = keyColumnName;
     this.timestampColumnName = timestampColumnName;
+    this.topicColumnName = topicColumnName;
   }
 
   @Override
@@ -128,6 +131,9 @@ public class KafkaInputReader implements InputEntityReader
     // the header list
     mergedHeaderMap.putIfAbsent(timestampColumnName, 
record.getRecord().timestamp());
 
+    // Add kafka record topic to the mergelist, only if the key doesn't 
already exist
+    mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic());
+
     return mergedHeaderMap;
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
index f65f335df9..21a0550f53 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
@@ -59,6 +59,7 @@ public class KafkaInputFormatTest
 {
   private KafkaRecordEntity inputEntity;
   private final long timestamp = DateTimes.of("2021-06-24").getMillis();
+  private static final String TOPIC = "sample";
   private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(
       new Header()
       {
@@ -126,7 +127,8 @@ public class KafkaInputFormatTest
         ),
         "kafka.newheader.",
         "kafka.newkey.key",
-        "kafka.newts.timestamp"
+        "kafka.newts.timestamp",
+        "kafka.newtopic.topic"
     );
   }
 
@@ -166,7 +168,8 @@ public class KafkaInputFormatTest
         ),
         "kafka.newheader.",
         "kafka.newkey.key",
-        "kafka.newts.timestamp"
+        "kafka.newts.timestamp",
+        "kafka.newtopic.topic"
     );
     Assert.assertEquals(format, kif);
 
@@ -209,7 +212,8 @@ public class KafkaInputFormatTest
                         "foo",
                         "kafka.newheader.encoding",
                         "kafka.newheader.kafkapkc",
-                        "kafka.newts.timestamp"
+                        "kafka.newts.timestamp",
+                        "kafka.newtopic.topic"
                     )
                 )
             ),
@@ -231,7 +235,8 @@ public class KafkaInputFormatTest
                 "foo",
                 "kafka.newheader.encoding",
                 "kafka.newheader.kafkapkc",
-                "kafka.newts.timestamp"
+                "kafka.newts.timestamp",
+                "kafka.newtopic.topic"
             ),
             row.getDimensions()
         );
@@ -254,6 +259,10 @@ public class KafkaInputFormatTest
             String.valueOf(DateTimes.of("2021-06-24").getMillis()),
             Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
         );
+        Assert.assertEquals(
+            TOPIC,
+            Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
+        );
         Assert.assertEquals(
             "2021-06-25",
             Iterables.getOnlyElement(row.getDimension("timestamp"))
@@ -302,7 +311,8 @@ public class KafkaInputFormatTest
                         "foo",
                         "kafka.newheader.encoding",
                         "kafka.newheader.kafkapkc",
-                        "kafka.newts.timestamp"
+                        "kafka.newts.timestamp",
+                        "kafka.newtopic.topic"
                     )
                 )
             ),
@@ -478,7 +488,7 @@ public class KafkaInputFormatTest
             null, null, false, //make sure JsonReader is used
             false, false
         ),
-        "kafka.newheader.", "kafka.newkey.", "kafka.newts."
+        "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic."
     );
 
     final InputEntityReader reader = localFormat.createReader(
@@ -489,7 +499,8 @@ public class KafkaInputFormatTest
                     ImmutableList.of(
                         "bar",
                         "foo",
-                        "kafka.newts.timestamp"
+                        "kafka.newts.timestamp",
+                        "kafka.newtopic.topic"
                     )
                 )
             ),
@@ -567,7 +578,8 @@ public class KafkaInputFormatTest
                         "foo",
                         "kafka.newheader.encoding",
                         "kafka.newheader.kafkapkc",
-                        "kafka.newts.timestamp"
+                        "kafka.newts.timestamp",
+                        "kafka.newtopic.topic"
                     )
                 )
             ),
@@ -613,6 +625,10 @@ public class KafkaInputFormatTest
               String.valueOf(DateTimes.of("2021-06-24").getMillis()),
               
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
           );
+          Assert.assertEquals(
+              TOPIC,
+              
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
+          );
           Assert.assertEquals(String.valueOf(i), 
Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH")));
 
 
@@ -669,7 +685,8 @@ public class KafkaInputFormatTest
                         "foo",
                         "kafka.newheader.encoding",
                         "kafka.newheader.kafkapkc",
-                        "kafka.newts.timestamp"
+                        "kafka.newts.timestamp",
+                        "kafka.newtopic.topic"
                     )
                 )
             ),
@@ -683,7 +700,8 @@ public class KafkaInputFormatTest
       while (iterator.hasNext()) {
         Throwable t = Assert.assertThrows(ParseException.class, () -> 
iterator.next());
         Assert.assertEquals(
-            "Timestamp[null] is unparseable! Event: {foo=x, 
kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4, 
bar=null, kafka...",
+            "Timestamp[null] is unparseable! Event: 
{kafka.newtopic.topic=sample, foo=x, kafka.newts"
+            + ".timestamp=1624492800000, kafka.newkey.key=sampleKey...",
             t.getMessage()
         );
       }
@@ -733,6 +751,7 @@ public class KafkaInputFormatTest
         final InputRow row = iterator.next();
         Assert.assertEquals(
             Arrays.asList(
+                "kafka.newtopic.topic",
                 "foo",
                 "kafka.newts.timestamp",
                 "kafka.newkey.key",
@@ -767,6 +786,10 @@ public class KafkaInputFormatTest
             String.valueOf(DateTimes.of("2021-06-24").getMillis()),
             Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
         );
+        Assert.assertEquals(
+            TOPIC,
+            Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
+        );
         Assert.assertEquals(
             "2021-06-25",
             Iterables.getOnlyElement(row.getDimension("timestamp"))
@@ -834,6 +857,7 @@ public class KafkaInputFormatTest
             Arrays.asList(
                 "bar",
                 "kafka.newheader.kafkapkc",
+                "kafka.newtopic.topic",
                 "foo",
                 "kafka.newts.timestamp",
                 "kafka.newkey.key",
@@ -866,6 +890,10 @@ public class KafkaInputFormatTest
             String.valueOf(DateTimes.of("2021-06-24").getMillis()),
             Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
         );
+        Assert.assertEquals(
+            TOPIC,
+            Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
+        );
         Assert.assertEquals(
             "2021-06-25",
             Iterables.getOnlyElement(row.getDimension("timestamp"))
@@ -889,7 +917,7 @@ public class KafkaInputFormatTest
   {
     return new KafkaRecordEntity(
         new ConsumerRecord<>(
-            "sample",
+            TOPIC,
             0,
             0,
             timestamp,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 135c87c4e1..04393cb914 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -171,7 +171,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
       new KafkaStringHeaderFormat(null),
       INPUT_FORMAT,
       INPUT_FORMAT,
-      "kafka.testheader.", "kafka.key", "kafka.timestamp"
+      "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic"
   );
 
   private static TestingCluster zkServer;
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 9a6fd03726..0a0b64396a 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -277,6 +277,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
                 new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, 
null),
                 null,
                 null,
+                null,
                 null
             ),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to