This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a38b4f0491 Add topic name as a column in the Kafka Input format
(#14857)
a38b4f0491 is described below
commit a38b4f04919c04bef2aaa9d6ffd4bcb3a653653a
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Mon Aug 21 21:32:34 2023 +0530
Add topic name as a column in the Kafka Input format (#14857)
This PR adds a way to store the topic name in a column. Such a column can
be used to distinguish messages coming from different topics in multi-topic
ingestion.
---
.../data/input/kafkainput/KafkaInputFormat.java | 21 +++++++--
.../data/input/kafkainput/KafkaInputReader.java | 8 +++-
.../input/kafkainput/KafkaInputFormatTest.java | 50 +++++++++++++++++-----
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 2 +-
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 1 +
5 files changed, 65 insertions(+), 17 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
index 8f86ff07de..1299667057 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
@@ -40,6 +40,7 @@ public class KafkaInputFormat implements InputFormat
{
private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header.";
private static final String DEFAULT_TIMESTAMP_COLUMN_NAME =
"kafka.timestamp";
+ private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic";
private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key";
public static final String DEFAULT_AUTO_TIMESTAMP_STRING =
"__kif_auto_timestamp";
@@ -54,6 +55,7 @@ public class KafkaInputFormat implements InputFormat
private final String headerColumnPrefix;
private final String keyColumnName;
private final String timestampColumnName;
+ private final String topicColumnName;
public KafkaInputFormat(
@JsonProperty("headerFormat") @Nullable KafkaHeaderFormat headerFormat,
@@ -61,7 +63,8 @@ public class KafkaInputFormat implements InputFormat
@JsonProperty("valueFormat") InputFormat valueFormat,
@JsonProperty("headerColumnPrefix") @Nullable String headerColumnPrefix,
@JsonProperty("keyColumnName") @Nullable String keyColumnName,
- @JsonProperty("timestampColumnName") @Nullable String timestampColumnName
+ @JsonProperty("timestampColumnName") @Nullable String
timestampColumnName,
+ @JsonProperty("topicColumnName") @Nullable String topicColumnName
)
{
this.headerFormat = headerFormat;
@@ -70,6 +73,7 @@ public class KafkaInputFormat implements InputFormat
this.headerColumnPrefix = headerColumnPrefix != null ? headerColumnPrefix
: DEFAULT_HEADER_COLUMN_PREFIX;
this.keyColumnName = keyColumnName != null ? keyColumnName :
DEFAULT_KEY_COLUMN_NAME;
this.timestampColumnName = timestampColumnName != null ?
timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME;
+ this.topicColumnName = topicColumnName != null ? topicColumnName :
DEFAULT_TOPIC_COLUMN_NAME;
}
@Override
@@ -116,7 +120,8 @@ public class KafkaInputFormat implements InputFormat
temporaryDirectory
),
keyColumnName,
- timestampColumnName
+ timestampColumnName,
+ topicColumnName
);
}
@@ -161,6 +166,13 @@ public class KafkaInputFormat implements InputFormat
return timestampColumnName;
}
+ @Nullable
+ @JsonProperty
+ public String getTopicColumnName()
+ {
+ return topicColumnName;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -176,14 +188,15 @@ public class KafkaInputFormat implements InputFormat
&& Objects.equals(keyFormat, that.keyFormat)
&& Objects.equals(headerColumnPrefix, that.headerColumnPrefix)
&& Objects.equals(keyColumnName, that.keyColumnName)
- && Objects.equals(timestampColumnName, that.timestampColumnName);
+ && Objects.equals(timestampColumnName, that.timestampColumnName)
+ && Objects.equals(topicColumnName, that.topicColumnName);
}
@Override
public int hashCode()
{
return Objects.hash(headerFormat, valueFormat, keyFormat,
- headerColumnPrefix, keyColumnName, timestampColumnName
+ headerColumnPrefix, keyColumnName,
timestampColumnName, topicColumnName
);
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
index 6d43a2e96f..31b7cf66be 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
@@ -56,6 +56,7 @@ public class KafkaInputReader implements InputEntityReader
private final InputEntityReader valueParser;
private final String keyColumnName;
private final String timestampColumnName;
+ private final String topicColumnName;
/**
*
@@ -74,7 +75,8 @@ public class KafkaInputReader implements InputEntityReader
@Nullable Function<KafkaRecordEntity, InputEntityReader>
keyParserSupplier,
InputEntityReader valueParser,
String keyColumnName,
- String timestampColumnName
+ String timestampColumnName,
+ String topicColumnName
)
{
this.inputRowSchema = inputRowSchema;
@@ -84,6 +86,7 @@ public class KafkaInputReader implements InputEntityReader
this.valueParser = valueParser;
this.keyColumnName = keyColumnName;
this.timestampColumnName = timestampColumnName;
+ this.topicColumnName = topicColumnName;
}
@Override
@@ -128,6 +131,9 @@ public class KafkaInputReader implements InputEntityReader
// the header list
mergedHeaderMap.putIfAbsent(timestampColumnName,
record.getRecord().timestamp());
+ // Add kafka record topic to the mergelist, only if the key doesn't
already exist
+ mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic());
+
return mergedHeaderMap;
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
index f65f335df9..21a0550f53 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java
@@ -59,6 +59,7 @@ public class KafkaInputFormatTest
{
private KafkaRecordEntity inputEntity;
private final long timestamp = DateTimes.of("2021-06-24").getMillis();
+ private static final String TOPIC = "sample";
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(
new Header()
{
@@ -126,7 +127,8 @@ public class KafkaInputFormatTest
),
"kafka.newheader.",
"kafka.newkey.key",
- "kafka.newts.timestamp"
+ "kafka.newts.timestamp",
+ "kafka.newtopic.topic"
);
}
@@ -166,7 +168,8 @@ public class KafkaInputFormatTest
),
"kafka.newheader.",
"kafka.newkey.key",
- "kafka.newts.timestamp"
+ "kafka.newts.timestamp",
+ "kafka.newtopic.topic"
);
Assert.assertEquals(format, kif);
@@ -209,7 +212,8 @@ public class KafkaInputFormatTest
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
- "kafka.newts.timestamp"
+ "kafka.newts.timestamp",
+ "kafka.newtopic.topic"
)
)
),
@@ -231,7 +235,8 @@ public class KafkaInputFormatTest
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
- "kafka.newts.timestamp"
+ "kafka.newts.timestamp",
+ "kafka.newtopic.topic"
),
row.getDimensions()
);
@@ -254,6 +259,10 @@ public class KafkaInputFormatTest
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
+ Assert.assertEquals(
+ TOPIC,
+ Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
+ );
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
@@ -302,7 +311,8 @@ public class KafkaInputFormatTest
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
- "kafka.newts.timestamp"
+ "kafka.newts.timestamp",
+ "kafka.newtopic.topic"
)
)
),
@@ -478,7 +488,7 @@ public class KafkaInputFormatTest
null, null, false, //make sure JsonReader is used
false, false
),
- "kafka.newheader.", "kafka.newkey.", "kafka.newts."
+ "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic."
);
final InputEntityReader reader = localFormat.createReader(
@@ -489,7 +499,8 @@ public class KafkaInputFormatTest
ImmutableList.of(
"bar",
"foo",
- "kafka.newts.timestamp"
+ "kafka.newts.timestamp",
+ "kafka.newtopic.topic"
)
)
),
@@ -567,7 +578,8 @@ public class KafkaInputFormatTest
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
- "kafka.newts.timestamp"
+ "kafka.newts.timestamp",
+ "kafka.newtopic.topic"
)
)
),
@@ -613,6 +625,10 @@ public class KafkaInputFormatTest
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
+ Assert.assertEquals(
+ TOPIC,
+
Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
+ );
Assert.assertEquals(String.valueOf(i),
Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH")));
@@ -669,7 +685,8 @@ public class KafkaInputFormatTest
"foo",
"kafka.newheader.encoding",
"kafka.newheader.kafkapkc",
- "kafka.newts.timestamp"
+ "kafka.newts.timestamp",
+ "kafka.newtopic.topic"
)
)
),
@@ -683,7 +700,8 @@ public class KafkaInputFormatTest
while (iterator.hasNext()) {
Throwable t = Assert.assertThrows(ParseException.class, () ->
iterator.next());
Assert.assertEquals(
- "Timestamp[null] is unparseable! Event: {foo=x,
kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4,
bar=null, kafka...",
+ "Timestamp[null] is unparseable! Event:
{kafka.newtopic.topic=sample, foo=x, kafka.newts"
+ + ".timestamp=1624492800000, kafka.newkey.key=sampleKey...",
t.getMessage()
);
}
@@ -733,6 +751,7 @@ public class KafkaInputFormatTest
final InputRow row = iterator.next();
Assert.assertEquals(
Arrays.asList(
+ "kafka.newtopic.topic",
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
@@ -767,6 +786,10 @@ public class KafkaInputFormatTest
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
+ Assert.assertEquals(
+ TOPIC,
+ Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
+ );
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
@@ -834,6 +857,7 @@ public class KafkaInputFormatTest
Arrays.asList(
"bar",
"kafka.newheader.kafkapkc",
+ "kafka.newtopic.topic",
"foo",
"kafka.newts.timestamp",
"kafka.newkey.key",
@@ -866,6 +890,10 @@ public class KafkaInputFormatTest
String.valueOf(DateTimes.of("2021-06-24").getMillis()),
Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp"))
);
+ Assert.assertEquals(
+ TOPIC,
+ Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic"))
+ );
Assert.assertEquals(
"2021-06-25",
Iterables.getOnlyElement(row.getDimension("timestamp"))
@@ -889,7 +917,7 @@ public class KafkaInputFormatTest
{
return new KafkaRecordEntity(
new ConsumerRecord<>(
- "sample",
+ TOPIC,
0,
0,
timestamp,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 135c87c4e1..04393cb914 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -171,7 +171,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
new KafkaStringHeaderFormat(null),
INPUT_FORMAT,
INPUT_FORMAT,
- "kafka.testheader.", "kafka.key", "kafka.timestamp"
+ "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic"
);
private static TestingCluster zkServer;
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 9a6fd03726..0a0b64396a 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -277,6 +277,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null,
null),
null,
null,
+ null,
null
),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]