kfaraz commented on code in PR #14424:
URL: https://github.com/apache/druid/pull/14424#discussion_r1281337181
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java:
##########
@@ -94,8 +99,8 @@ public KafkaIndexTaskIOConfig(
public KafkaIndexTaskIOConfig(
int taskGroupId,
String baseSequenceName,
- SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers,
- SeekableStreamEndSequenceNumbers<Integer, Long> endSequenceNumbers,
+ SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>
startSequenceNumbers,
+ SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>
endSequenceNumbers,
Review Comment:
nit: extra space
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id
can be used for different topics.
+ * This class is used as a key in {@link
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+ keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using =
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+ KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+ private final int partition;
+ @Nullable
+ private final String topic;
+
+ // This flag is used to maintain backward incompatibilty with older versions
of kafka indexing. If this flag
+ // is set to false,
+ // - KafkaTopicPartition will be serialized as an integer and can be read
back by older version.
+ // - topic field is ignored while comparing two KafkaTopicPartition objects
and calculating hashcode.
+ // This flag must be explicitly passed while constructing
KafkaTopicPartition object. That way, we can ensure that
+ // a particular supervisor is always running in multi topic mode or single
topic mode.
+ private final boolean multiTopicPartition;
+
+ public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String
topic, int partition)
+ {
+ this.partition = partition;
+ this.topic = topic;
+ this.multiTopicPartition = multiTopicPartition;
+ }
+
+ public int partition()
+ {
+ return partition;
+ }
+
+ public Optional<String> topic()
+ {
+ return Optional.ofNullable(topic);
+ }
+
+ public boolean isMultiTopicPartition()
+ {
+ return multiTopicPartition;
+ }
+
+ public TopicPartition asTopicPartition(String fallbackTopic)
+ {
+ return new TopicPartition(topic != null ? topic : fallbackTopic,
partition);
+ }
+
+ @Override
+ public String toString()
+ {
+ // TODO - fix this so toString is not used for serialization
+ if (null != topic && multiTopicPartition) {
+ return topic + ":" + partition;
+ } else {
+ return Integer.toString(partition);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaTopicPartition that = (KafkaTopicPartition) o;
+ return partition == that.partition && multiTopicPartition ==
that.multiTopicPartition && (!multiTopicPartition
+
|| Objects.equals(
+ topic,
+ that.topic
+ ));
+ }
+
+ @Override
+ public int hashCode()
+ {
+ if (multiTopicPartition) {
Review Comment:
I am a little confused about excluding the topic from equals and hashCode if
`multiTopic` is false.
In the impl here, `new KafkaTopicPartition(false, "topicA", 1)` would be
considered equal to `new KafkaTopicPartition(false, "topicB", 1)`.
The argument against this possibility could be that we would never have a
scenario where we are required to compare two such partitions. In single-topic
mode, the supervisor (or a task) would be dealing with either `topicA` or
`topicB`, never both.
But I guess a similar argument would apply when comparing `new
KafkaTopicPartition(false, "topicA", 1)` and `new KafkaTopicPartition(false,
null, 1)`. In other words, the supervisor would never have to compare two such
objects (because either all tasks would be running new Druid version or all
tasks would be running old Druid version from pov of supervisor due to order of
upgrades), and a task would never have to compare two such objects, because in
the lifecycle of the task, it would ever see only one format of partitions.
In short, the hashCode and equals methods can be simplified to just check
all the 3 fields.
But maybe I am missing some cases.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -126,19 +126,20 @@ public KafkaSupervisor(
@Override
- protected RecordSupplier<Integer, Long, KafkaRecordEntity>
setupRecordSupplier()
+ protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity>
setupRecordSupplier()
{
return new KafkaRecordSupplier(
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
- spec.getIoConfig().getConfigOverrides()
+ spec.getIoConfig().getConfigOverrides(),
+ spec.getIoConfig().isMultiTopic()
);
}
@Override
- protected int getTaskGroupIdForPartition(Integer partitionId)
+ protected int getTaskGroupIdForPartition(KafkaTopicPartition partitionId)
{
- return partitionId % spec.getIoConfig().getTaskCount();
+ return partitionId.partition() % spec.getIoConfig().getTaskCount();
Review Comment:
Hmm, this seems a little tricky.
This logic would always lead to a skew even with single topic ingestion, but
the difference between most assigned taskGroupId and least assigned taskGroupId
would have been only 1. With multiple topics and no limit on the number of
topics, this difference can be anything.
Say there are 5 topics A, B, C, D, E with 6 partitions each and task count
is 4.
Then taskGroupId 0 and 1 would get 10 partitions each
whereas taskGroupId 2 and 3 would get 5 partitions each.
The skew can be reduced in two ways:
- Get total partition count of each topic and then assign them new integer
ids. e.g. A:1 -> 1, A:2 -> 2... A:5 -> 5, then B:1 -> 6, B:2 -> 7 and so on.
Then do a module of these new overall IDs instead of the
`partitionId.partition()`. This is very cumbersome though.
- A simpler approach could be to offset the assignment for every topic. So A
starts assignment at taskGroupId 0, but B starts assignment at taskGroupId 1
and so on.
- While constructing this class, we just need to get the list of topics
from the stream.
- Then the code here could become
```
topics = List of topics
return (partitionId.partition() + topics.getIndexOf(partitionId.topic())) %
taskCount;
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java:
##########
@@ -78,9 +82,10 @@ public KafkaIndexTaskIOConfig(
this.consumerProperties = Preconditions.checkNotNull(consumerProperties,
"consumerProperties");
this.pollTimeout = pollTimeout != null ? pollTimeout :
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.configOverrides = configOverrides;
+ this.multiTopic = multiTopic != null ? multiTopic :
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC;
Review Comment:
Nit: can use utility for readability
```suggestion
this.multiTopic = Configs.valueOrDefault(multiTopic,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC);
```
##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafka/KafkaTopicPartitionTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KafkaTopicPartitionTest
+{
+ @Test
+ public void testEquals()
+ {
+ KafkaTopicPartition partition1 = new KafkaTopicPartition(false, "topic",
0);
+ KafkaTopicPartition partition2 = new KafkaTopicPartition(false, "topic",
0);
+ KafkaTopicPartition partition3 = new KafkaTopicPartition(false, "topic",
1);
+ KafkaTopicPartition partition4 = new KafkaTopicPartition(false, "topic2",
0);
+ KafkaTopicPartition partition5 = new KafkaTopicPartition(false, null, 0);
+ KafkaTopicPartition partition6 = new KafkaTopicPartition(false, null, 0);
+ KafkaTopicPartition partition7 = new KafkaTopicPartition(true, "topic", 0);
+ KafkaTopicPartition partition8 = new KafkaTopicPartition(true, "topic2",
0);
+
+ Assert.assertEquals(partition1, partition2);
+ Assert.assertNotEquals(partition1, partition3);
+ Assert.assertEquals(partition1, partition4);
+ Assert.assertEquals(partition5, partition6);
+ Assert.assertEquals(partition1, partition5);
+ Assert.assertNotEquals(partition1, partition7);
+ Assert.assertNotEquals(partition7, partition8);
+ }
+
+ @Test
+ public void testHashCode()
+ {
+ KafkaTopicPartition partition1 = new KafkaTopicPartition(false, "topic",
0);
+ KafkaTopicPartition partition2 = new KafkaTopicPartition(false, "topic",
0);
+ KafkaTopicPartition partition3 = new KafkaTopicPartition(false, "topic",
1);
+ KafkaTopicPartition partition4 = new KafkaTopicPartition(false, "topic2",
0);
+ KafkaTopicPartition partition5 = new KafkaTopicPartition(false, null, 0);
+ KafkaTopicPartition partition6 = new KafkaTopicPartition(false, null, 0);
+ KafkaTopicPartition partition7 = new KafkaTopicPartition(true, "topic", 0);
+ KafkaTopicPartition partition8 = new KafkaTopicPartition(true, "topic2",
0);
+
+ Assert.assertEquals(partition1.hashCode(), partition2.hashCode());
+ Assert.assertNotEquals(partition1.hashCode(), partition3.hashCode());
+ Assert.assertEquals(partition1.hashCode(), partition4.hashCode());
+ Assert.assertEquals(partition5.hashCode(), partition6.hashCode());
+ Assert.assertEquals(partition1.hashCode(), partition5.hashCode());
+ Assert.assertNotEquals(partition1.hashCode(), partition7.hashCode());
+ Assert.assertNotEquals(partition7.hashCode(), partition8.hashCode());
+ }
+
+ @Test
+ public void testMultiTopicDeserialization() throws JsonProcessingException
+ {
+ ObjectMapper objectMapper = new ObjectMapper();
+ KafkaTopicPartition partition =
objectMapper.readerFor(KafkaTopicPartition.class).readValue("\"topic:0\"");
+ Assert.assertEquals(0, partition.partition());
+ Assert.assertEquals("topic", partition.topic().orElse(null));
+ Assert.assertTrue(partition.isMultiTopicPartition());
+ }
+
+ @Test
+ public void testSingleTopicDeserialization() throws JsonProcessingException
+ {
+ ObjectMapper objectMapper = new ObjectMapper();
Review Comment:
Maybe instantiate just one mapper at the class level.
##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -53,6 +53,7 @@ This topic contains configuration reference information for
the Apache Kafka sup
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no
(default == null)|
|`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle.
See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more
details.|no (default == null)|
+|`multiTopic`|Boolean|Set this to true if you want to ingest data from
multiple topics from single supervisor. See [Ingesting from multiple
topics](#ingesting-from-multi-topics) for more details.|no (default == false)|
Review Comment:
```suggestion
|`multiTopic`|Boolean|Set this to true if you want to ingest data from
multiple Kafka topics using a single supervisor. See [Ingesting from multiple
topics](#ingesting-from-multi-topics) for more details.|no (default == false)|
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id
can be used for different topics.
+ * This class is used as a key in {@link
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+ keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using =
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+ KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+ private final int partition;
+ @Nullable
+ private final String topic;
+
+ // This flag is used to maintain backward incompatibilty with older versions
of kafka indexing. If this flag
+ // is set to false,
+ // - KafkaTopicPartition will be serialized as an integer and can be read
back by older version.
+ // - topic field is ignored while comparing two KafkaTopicPartition objects
and calculating hashcode.
+ // This flag must be explicitly passed while constructing
KafkaTopicPartition object. That way, we can ensure that
+ // a particular supervisor is always running in multi topic mode or single
topic mode.
+ private final boolean multiTopicPartition;
+
+ public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String
topic, int partition)
+ {
+ this.partition = partition;
Review Comment:
It would be nice to have some validation here that `topic` is not null when
`multiTopic` is true.
##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -53,6 +53,7 @@ This topic contains configuration reference information for
the Apache Kafka sup
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no
(default == null)|
|`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle.
See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more
details.|no (default == null)|
+|`multiTopic`|Boolean|Set this to true if you want to ingest data from
multiple topics from single supervisor. See [Ingesting from multiple
topics](#ingesting-from-multi-topics) for more details.|no (default == false)|
Review Comment:
If we can pass a comma-separated list in the `topic` field, why do we need
this flag?
Also, the description for `topic` field in this doc needs to be updated.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -162,31 +189,39 @@ public Long
getEarliestSequenceNumber(StreamPartition<Integer> partition)
}
@Override
- public boolean isOffsetAvailable(StreamPartition<Integer> partition,
OrderedSequenceNumber<Long> offset)
+ public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition>
partition, OrderedSequenceNumber<Long> offset)
{
final Long earliestOffset = getEarliestSequenceNumber(partition);
return earliestOffset != null
&&
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
}
@Override
- public Long getPosition(StreamPartition<Integer> partition)
+ public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
{
- return wrapExceptions(() -> consumer.position(new TopicPartition(
- partition.getStream(),
- partition.getPartitionId()
- )));
+ return wrapExceptions(() ->
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
}
@Override
- public Set<Integer> getPartitionIds(String stream)
+ public Set<KafkaTopicPartition> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
- List<PartitionInfo> partitions = consumer.partitionsFor(stream);
- if (partitions == null) {
- throw new ISE("Topic [%s] is not found in KafkaConsumer's list of
topics", stream);
+ List<PartitionInfo> allPartitions = new ArrayList<>();
+ for (String topic : stream.split(",")) {
+ if (!multiTopic && !allPartitions.isEmpty()) {
+ throw InvalidInput.exception("Comma separated list of topics [%s] is
not supported unless you enabled "
+ + "multiTopic in the
KafkaSupervisorSpec.", stream);
+ }
+ List<PartitionInfo> partitions = consumer.partitionsFor(topic.trim());
+ if (partitions == null) {
+ throw new ISE("Topic [%s] is not found in KafkaConsumer's list of
topics", topic.trim());
Review Comment:
Throw some kind of `DruidException` instead?
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id
can be used for different topics.
+ * This class is used as a key in {@link
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+ keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using =
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+ KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+ private final int partition;
+ @Nullable
+ private final String topic;
+
+ // This flag is used to maintain backward incompatibilty with older versions
of kafka indexing. If this flag
+ // is set to false,
+ // - KafkaTopicPartition will be serialized as an integer and can be read
back by older version.
+ // - topic field is ignored while comparing two KafkaTopicPartition objects
and calculating hashcode.
+ // This flag must be explicitly passed while constructing
KafkaTopicPartition object. That way, we can ensure that
+ // a particular supervisor is always running in multi topic mode or single
topic mode.
+ private final boolean multiTopicPartition;
+
+ public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String
topic, int partition)
+ {
+ this.partition = partition;
+ this.topic = topic;
+ this.multiTopicPartition = multiTopicPartition;
+ }
+
+ public int partition()
+ {
+ return partition;
+ }
+
+ public Optional<String> topic()
+ {
+ return Optional.ofNullable(topic);
+ }
+
+ public boolean isMultiTopicPartition()
+ {
+ return multiTopicPartition;
+ }
+
+ public TopicPartition asTopicPartition(String fallbackTopic)
Review Comment:
We should mention that the fallback is applicable/required only when
`multiTopic` is false.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -162,31 +189,39 @@ public Long
getEarliestSequenceNumber(StreamPartition<Integer> partition)
}
@Override
- public boolean isOffsetAvailable(StreamPartition<Integer> partition,
OrderedSequenceNumber<Long> offset)
+ public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition>
partition, OrderedSequenceNumber<Long> offset)
{
final Long earliestOffset = getEarliestSequenceNumber(partition);
return earliestOffset != null
&&
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
}
@Override
- public Long getPosition(StreamPartition<Integer> partition)
+ public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
{
- return wrapExceptions(() -> consumer.position(new TopicPartition(
- partition.getStream(),
- partition.getPartitionId()
- )));
+ return wrapExceptions(() ->
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
}
@Override
- public Set<Integer> getPartitionIds(String stream)
+ public Set<KafkaTopicPartition> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
- List<PartitionInfo> partitions = consumer.partitionsFor(stream);
- if (partitions == null) {
- throw new ISE("Topic [%s] is not found in KafkaConsumer's list of
topics", stream);
+ List<PartitionInfo> allPartitions = new ArrayList<>();
+ for (String topic : stream.split(",")) {
+ if (!multiTopic && !allPartitions.isEmpty()) {
+ throw InvalidInput.exception("Comma separated list of topics [%s] is
not supported unless you enabled "
Review Comment:
Nit: this validation should be done before the loop with `!isMultiTopic and
stream.contains(",")`.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -55,20 +57,29 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.function.Function;
import java.util.stream.Collectors;
-public class KafkaRecordSupplier implements RecordSupplier<Integer, Long,
KafkaRecordEntity>
+public class KafkaRecordSupplier implements
RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity>
{
private final KafkaConsumer<byte[], byte[]> consumer;
private boolean closed;
+ private boolean multiTopic;
+
+ // Store the stream information when partitions get assigned. This is
required because the consumer does not
+ // know about the parent stream which could be a list of topics.
+ private String stream;
Review Comment:
Does this need to be `volatile`?
Also, maybe convert the comment into a javadoc. It would probably be nice to
call out that this would have the same value as the `topic` field in the
supervisor spec.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -162,31 +189,39 @@ public Long
getEarliestSequenceNumber(StreamPartition<Integer> partition)
}
@Override
- public boolean isOffsetAvailable(StreamPartition<Integer> partition,
OrderedSequenceNumber<Long> offset)
+ public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition>
partition, OrderedSequenceNumber<Long> offset)
{
final Long earliestOffset = getEarliestSequenceNumber(partition);
return earliestOffset != null
&&
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
}
@Override
- public Long getPosition(StreamPartition<Integer> partition)
+ public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
{
- return wrapExceptions(() -> consumer.position(new TopicPartition(
- partition.getStream(),
- partition.getPartitionId()
- )));
+ return wrapExceptions(() ->
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
}
@Override
- public Set<Integer> getPartitionIds(String stream)
+ public Set<KafkaTopicPartition> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
- List<PartitionInfo> partitions = consumer.partitionsFor(stream);
- if (partitions == null) {
- throw new ISE("Topic [%s] is not found in KafkaConsumer's list of
topics", stream);
+ List<PartitionInfo> allPartitions = new ArrayList<>();
+ for (String topic : stream.split(",")) {
+ if (!multiTopic && !allPartitions.isEmpty()) {
+ throw InvalidInput.exception("Comma separated list of topics [%s] is
not supported unless you enabled "
+ + "multiTopic in the
KafkaSupervisorSpec.", stream);
+ }
+ List<PartitionInfo> partitions = consumer.partitionsFor(topic.trim());
+ if (partitions == null) {
+ throw new ISE("Topic [%s] is not found in KafkaConsumer's list of
topics", topic.trim());
Review Comment:
Nit: rephrase
```suggestion
throw new ISE("Could not find topic[%s] using Kafka consumer",
topic.trim());
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -55,20 +57,29 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.function.Function;
import java.util.stream.Collectors;
-public class KafkaRecordSupplier implements RecordSupplier<Integer, Long,
KafkaRecordEntity>
+public class KafkaRecordSupplier implements
RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity>
{
private final KafkaConsumer<byte[], byte[]> consumer;
private boolean closed;
+ private boolean multiTopic;
Review Comment:
can be final.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id
can be used for different topics.
+ * This class is used as a key in {@link
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+ keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using =
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+ KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+ private final int partition;
+ @Nullable
+ private final String topic;
+
+ // This flag is used to maintain backward incompatibilty with older versions
of kafka indexing. If this flag
+ // is set to false,
+ // - KafkaTopicPartition will be serialized as an integer and can be read
back by older version.
+ // - topic field is ignored while comparing two KafkaTopicPartition objects
and calculating hashcode.
+ // This flag must be explicitly passed while constructing
KafkaTopicPartition object. That way, we can ensure that
+ // a particular supervisor is always running in multi topic mode or single
topic mode.
+ private final boolean multiTopicPartition;
+
+ public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String
topic, int partition)
+ {
+ this.partition = partition;
+ this.topic = topic;
+ this.multiTopicPartition = multiTopicPartition;
+ }
+
+ public int partition()
+ {
+ return partition;
+ }
+
+ public Optional<String> topic()
+ {
+ return Optional.ofNullable(topic);
+ }
+
+ public boolean isMultiTopicPartition()
+ {
+ return multiTopicPartition;
+ }
+
+ public TopicPartition asTopicPartition(String fallbackTopic)
+ {
+ return new TopicPartition(topic != null ? topic : fallbackTopic,
partition);
Review Comment:
Might be nice to have an if on `multiTopic`:
```suggestion
if (multiTopic) {
return new TopicPartition(topic, partition);
} else {
return new TopicPartition(topic != null ? topic : fallbackTopic,
partition);
}
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id
can be used for different topics.
+ * This class is used as a key in {@link
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+ keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using =
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+ KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+ private final int partition;
+ @Nullable
+ private final String topic;
+
+ // This flag is used to maintain backward incompatibilty with older versions
of kafka indexing. If this flag
Review Comment:
+1 for making it a javadoc. Also the content needs updating given the
current code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]