kfaraz commented on code in PR #14424:
URL: https://github.com/apache/druid/pull/14424#discussion_r1281337181


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java:
##########
@@ -94,8 +99,8 @@ public KafkaIndexTaskIOConfig(
   public KafkaIndexTaskIOConfig(
       int taskGroupId,
       String baseSequenceName,
-      SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers,
-      SeekableStreamEndSequenceNumbers<Integer, Long> endSequenceNumbers,
+      SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long> 
startSequenceNumbers,
+        SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long> 
endSequenceNumbers,

Review Comment:
   nit: extra space



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition 
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id 
can be used for different topics.
+ * This class is used as a key in {@link 
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+    keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using = 
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+    KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+  private final int partition;
+  @Nullable
+  private final String topic;
+
+  // This flag is used to maintain backward incompatibilty with older versions 
of kafka indexing. If this flag
+  // is set to false,
+  // - KafkaTopicPartition will be serialized as an integer and can be read 
back by older version.
+  // - topic field is ignored while comparing two KafkaTopicPartition objects 
and calculating hashcode.
+  // This flag must be explicitly passed while constructing 
KafkaTopicPartition object. That way, we can ensure that
+  // a particular supervisor is always running in multi topic mode or single 
topic mode.
+  private final boolean multiTopicPartition;
+
+  public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String 
topic, int partition)
+  {
+    this.partition = partition;
+    this.topic = topic;
+    this.multiTopicPartition = multiTopicPartition;
+  }
+
+  public int partition()
+  {
+    return partition;
+  }
+
+  public Optional<String> topic()
+  {
+    return Optional.ofNullable(topic);
+  }
+
+  public boolean isMultiTopicPartition()
+  {
+    return multiTopicPartition;
+  }
+
+  public TopicPartition asTopicPartition(String fallbackTopic)
+  {
+    return new TopicPartition(topic != null ? topic : fallbackTopic, 
partition);
+  }
+
+  @Override
+  public String toString()
+  {
+    // TODO - fix this so toString is not used for serialization
+    if (null != topic && multiTopicPartition) {
+      return topic + ":" + partition;
+    } else {
+      return Integer.toString(partition);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    KafkaTopicPartition that = (KafkaTopicPartition) o;
+    return partition == that.partition && multiTopicPartition == 
that.multiTopicPartition && (!multiTopicPartition
+                                                                               
               || Objects.equals(
+        topic,
+        that.topic
+    ));
+  }
+
+  @Override
+  public int hashCode()
+  {
+    if (multiTopicPartition) {

Review Comment:
   I am a little confused about excluding the topic from equals and hashCode if 
`multiTopic` is false.
   
   In the impl here, `new KafkaTopicPartition(false, "topicA", 1)` would be 
considered equal to `new KafkaTopicPartition(false, "topicB", 1)`. 
   
   The argument against this possibility could be that we would never have a 
scenario where we are required to compare two such partitions. In single-topic 
mode, the supervisor (or a task) would be dealing with either `topicA` or 
`topicB`, never both.
   
   But I guess a similar argument would apply when comparing `new 
KafkaTopicPartition(false, "topicA", 1)` and `new KafkaTopicPartition(false, 
null, 1)`. In other words, the supervisor would never have to compare two such 
objects (because either all tasks would be running new Druid version or all 
tasks would be running old Druid version from pov of supervisor due to order of 
upgrades), and a task would never have to compare two such objects, because in 
the lifecycle of the task, it would ever see only one format of partitions.
   
   In short, the hashCode and equals methods can be simplified to just check 
all the 3 fields.
   But maybe I am missing some cases.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -126,19 +126,20 @@ public KafkaSupervisor(
 
 
   @Override
-  protected RecordSupplier<Integer, Long, KafkaRecordEntity> 
setupRecordSupplier()
+  protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> 
setupRecordSupplier()
   {
     return new KafkaRecordSupplier(
         spec.getIoConfig().getConsumerProperties(),
         sortingMapper,
-        spec.getIoConfig().getConfigOverrides()
+        spec.getIoConfig().getConfigOverrides(),
+        spec.getIoConfig().isMultiTopic()
     );
   }
 
   @Override
-  protected int getTaskGroupIdForPartition(Integer partitionId)
+  protected int getTaskGroupIdForPartition(KafkaTopicPartition partitionId)
   {
-    return partitionId % spec.getIoConfig().getTaskCount();
+    return partitionId.partition() % spec.getIoConfig().getTaskCount();

Review Comment:
   Hmm, this seems a little tricky. 
   
   This logic would always lead to a skew even with single topic ingestion, but 
the difference between most assigned taskGroupId and least assigned taskGroupId 
would have been only 1. With multiple topics and no limit on the number of 
topics, this difference can be anything.
   
   Say there are 5 topics A, B, C, D, E with 6 partitions each and task count 
is 4.
   Then taskGroupId 0 and 1 would get 10 partitions each
   whereas taskGroupId 2 and 3 would get 5 partitions each.
   
   The skew can be reduced in two ways:
   - Get total partition count of each topic and then assign them new integer 
ids. e.g. A:1 -> 1, A:2 -> 2... A:5 -> 5, then B:1 -> 6, B:2 -> 7 and so on. 
Then do a module of these new overall IDs instead of the 
`partitionId.partition()`. This is very cumbersome though.
   - A simpler approach could be to offset the assignment for every topic. So A 
starts assignment at taskGroupId 0, but B starts assignment at taskGroupId 1 
and so on.
     - While constructing this class, we just need to get the list of topics 
from the stream.
     - Then the code here could become
   ```
   topics = List of topics
   return (partitionId.partition() + topics.getIndexOf(partitionId.topic())) % 
taskCount;
   ```  
     



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java:
##########
@@ -78,9 +82,10 @@ public KafkaIndexTaskIOConfig(
     this.consumerProperties = Preconditions.checkNotNull(consumerProperties, 
"consumerProperties");
     this.pollTimeout = pollTimeout != null ? pollTimeout : 
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
     this.configOverrides = configOverrides;
+    this.multiTopic = multiTopic != null ? multiTopic : 
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC;

Review Comment:
   Nit: can use utility for readability
   ```suggestion
       this.multiTopic = Configs.valueOrDefault(multiTopic, 
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC);
   ```



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafka/KafkaTopicPartitionTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KafkaTopicPartitionTest
+{
+  @Test
+  public void testEquals()
+  {
+    KafkaTopicPartition partition1 = new KafkaTopicPartition(false, "topic", 
0);
+    KafkaTopicPartition partition2 = new KafkaTopicPartition(false, "topic", 
0);
+    KafkaTopicPartition partition3 = new KafkaTopicPartition(false, "topic", 
1);
+    KafkaTopicPartition partition4 = new KafkaTopicPartition(false, "topic2", 
0);
+    KafkaTopicPartition partition5 = new KafkaTopicPartition(false, null, 0);
+    KafkaTopicPartition partition6 = new KafkaTopicPartition(false, null, 0);
+    KafkaTopicPartition partition7 = new KafkaTopicPartition(true, "topic", 0);
+    KafkaTopicPartition partition8 = new KafkaTopicPartition(true, "topic2", 
0);
+
+    Assert.assertEquals(partition1, partition2);
+    Assert.assertNotEquals(partition1, partition3);
+    Assert.assertEquals(partition1, partition4);
+    Assert.assertEquals(partition5, partition6);
+    Assert.assertEquals(partition1, partition5);
+    Assert.assertNotEquals(partition1, partition7);
+    Assert.assertNotEquals(partition7, partition8);
+  }
+
+  @Test
+  public void testHashCode()
+  {
+    KafkaTopicPartition partition1 = new KafkaTopicPartition(false, "topic", 
0);
+    KafkaTopicPartition partition2 = new KafkaTopicPartition(false, "topic", 
0);
+    KafkaTopicPartition partition3 = new KafkaTopicPartition(false, "topic", 
1);
+    KafkaTopicPartition partition4 = new KafkaTopicPartition(false, "topic2", 
0);
+    KafkaTopicPartition partition5 = new KafkaTopicPartition(false, null, 0);
+    KafkaTopicPartition partition6 = new KafkaTopicPartition(false, null, 0);
+    KafkaTopicPartition partition7 = new KafkaTopicPartition(true, "topic", 0);
+    KafkaTopicPartition partition8 = new KafkaTopicPartition(true, "topic2", 
0);
+
+    Assert.assertEquals(partition1.hashCode(), partition2.hashCode());
+    Assert.assertNotEquals(partition1.hashCode(), partition3.hashCode());
+    Assert.assertEquals(partition1.hashCode(), partition4.hashCode());
+    Assert.assertEquals(partition5.hashCode(), partition6.hashCode());
+    Assert.assertEquals(partition1.hashCode(), partition5.hashCode());
+    Assert.assertNotEquals(partition1.hashCode(), partition7.hashCode());
+    Assert.assertNotEquals(partition7.hashCode(), partition8.hashCode());
+  }
+
+  @Test
+  public void testMultiTopicDeserialization() throws JsonProcessingException
+  {
+    ObjectMapper objectMapper = new ObjectMapper();
+    KafkaTopicPartition partition = 
objectMapper.readerFor(KafkaTopicPartition.class).readValue("\"topic:0\"");
+    Assert.assertEquals(0, partition.partition());
+    Assert.assertEquals("topic", partition.topic().orElse(null));
+    Assert.assertTrue(partition.isMultiTopicPartition());
+  }
+
+  @Test
+  public void testSingleTopicDeserialization() throws JsonProcessingException
+  {
+    ObjectMapper objectMapper = new ObjectMapper();

Review Comment:
   Maybe instantiate just one mapper at the class level.



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -53,6 +53,7 @@ This topic contains configuration reference information for 
the Apache Kafka sup
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
 |`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest 
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no 
(default == null)|
 |`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle. 
See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more 
details.|no (default == null)|
+|`multiTopic`|Boolean|Set this to true if you want to ingest data from 
multiple topics from single supervisor. See [Ingesting from multiple 
topics](#ingesting-from-multi-topics) for more details.|no (default == false)|

Review Comment:
   ```suggestion
   |`multiTopic`|Boolean|Set this to true if you want to ingest data from 
multiple Kafka topics using a single supervisor. See [Ingesting from multiple 
topics](#ingesting-from-multi-topics) for more details.|no (default == false)|
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition 
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id 
can be used for different topics.
+ * This class is used as a key in {@link 
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+    keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using = 
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+    KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+  private final int partition;
+  @Nullable
+  private final String topic;
+
+  // This flag is used to maintain backward incompatibilty with older versions 
of kafka indexing. If this flag
+  // is set to false,
+  // - KafkaTopicPartition will be serialized as an integer and can be read 
back by older version.
+  // - topic field is ignored while comparing two KafkaTopicPartition objects 
and calculating hashcode.
+  // This flag must be explicitly passed while constructing 
KafkaTopicPartition object. That way, we can ensure that
+  // a particular supervisor is always running in multi topic mode or single 
topic mode.
+  private final boolean multiTopicPartition;
+
+  public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String 
topic, int partition)
+  {
+    this.partition = partition;

Review Comment:
   It would be nice to have some validation here that `topic` is not null when 
`multiTopic` is true.



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -53,6 +53,7 @@ This topic contains configuration reference information for 
the Apache Kafka sup
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
 |`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest 
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no 
(default == null)|
 |`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle. 
See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more 
details.|no (default == null)|
+|`multiTopic`|Boolean|Set this to true if you want to ingest data from 
multiple topics from single supervisor. See [Ingesting from multiple 
topics](#ingesting-from-multi-topics) for more details.|no (default == false)|

Review Comment:
   If we can pass a comma-separated list in the `topic` field, why do we need 
this flag?
   Also, the description for `topic` field in this doc needs to be updated.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -162,31 +189,39 @@ public Long 
getEarliestSequenceNumber(StreamPartition<Integer> partition)
   }
 
   @Override
-  public boolean isOffsetAvailable(StreamPartition<Integer> partition, 
OrderedSequenceNumber<Long> offset)
+  public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition> 
partition, OrderedSequenceNumber<Long> offset)
   {
     final Long earliestOffset = getEarliestSequenceNumber(partition);
     return earliestOffset != null
            && 
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
   }
 
   @Override
-  public Long getPosition(StreamPartition<Integer> partition)
+  public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
   {
-    return wrapExceptions(() -> consumer.position(new TopicPartition(
-        partition.getStream(),
-        partition.getPartitionId()
-    )));
+    return wrapExceptions(() -> 
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
   }
 
   @Override
-  public Set<Integer> getPartitionIds(String stream)
+  public Set<KafkaTopicPartition> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      List<PartitionInfo> partitions = consumer.partitionsFor(stream);
-      if (partitions == null) {
-        throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", stream);
+      List<PartitionInfo> allPartitions = new ArrayList<>();
+      for (String topic : stream.split(",")) {
+        if (!multiTopic && !allPartitions.isEmpty()) {
+          throw InvalidInput.exception("Comma separated list of topics [%s] is 
not supported unless you enabled "
+                                       + "multiTopic in the 
KafkaSupervisorSpec.", stream);
+        }
+        List<PartitionInfo> partitions = consumer.partitionsFor(topic.trim());
+        if (partitions == null) {
+          throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", topic.trim());

Review Comment:
   Throw some kind of `DruidException` instead?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition 
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id 
can be used for different topics.
+ * This class is used as a key in {@link 
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+    keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using = 
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+    KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+  private final int partition;
+  @Nullable
+  private final String topic;
+
+  // This flag is used to maintain backward incompatibilty with older versions 
of kafka indexing. If this flag
+  // is set to false,
+  // - KafkaTopicPartition will be serialized as an integer and can be read 
back by older version.
+  // - topic field is ignored while comparing two KafkaTopicPartition objects 
and calculating hashcode.
+  // This flag must be explicitly passed while constructing 
KafkaTopicPartition object. That way, we can ensure that
+  // a particular supervisor is always running in multi topic mode or single 
topic mode.
+  private final boolean multiTopicPartition;
+
+  public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String 
topic, int partition)
+  {
+    this.partition = partition;
+    this.topic = topic;
+    this.multiTopicPartition = multiTopicPartition;
+  }
+
+  public int partition()
+  {
+    return partition;
+  }
+
+  public Optional<String> topic()
+  {
+    return Optional.ofNullable(topic);
+  }
+
+  public boolean isMultiTopicPartition()
+  {
+    return multiTopicPartition;
+  }
+
+  public TopicPartition asTopicPartition(String fallbackTopic)

Review Comment:
   We should mention that the fallback is applicable/required only when 
`multiTopic` is false.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -162,31 +189,39 @@ public Long 
getEarliestSequenceNumber(StreamPartition<Integer> partition)
   }
 
   @Override
-  public boolean isOffsetAvailable(StreamPartition<Integer> partition, 
OrderedSequenceNumber<Long> offset)
+  public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition> 
partition, OrderedSequenceNumber<Long> offset)
   {
     final Long earliestOffset = getEarliestSequenceNumber(partition);
     return earliestOffset != null
            && 
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
   }
 
   @Override
-  public Long getPosition(StreamPartition<Integer> partition)
+  public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
   {
-    return wrapExceptions(() -> consumer.position(new TopicPartition(
-        partition.getStream(),
-        partition.getPartitionId()
-    )));
+    return wrapExceptions(() -> 
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
   }
 
   @Override
-  public Set<Integer> getPartitionIds(String stream)
+  public Set<KafkaTopicPartition> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      List<PartitionInfo> partitions = consumer.partitionsFor(stream);
-      if (partitions == null) {
-        throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", stream);
+      List<PartitionInfo> allPartitions = new ArrayList<>();
+      for (String topic : stream.split(",")) {
+        if (!multiTopic && !allPartitions.isEmpty()) {
+          throw InvalidInput.exception("Comma separated list of topics [%s] is 
not supported unless you enabled "

Review Comment:
   Nit: this validation should be done before the loop with `!isMultiTopic and 
stream.contains(",")`.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -55,20 +57,29 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
-public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, 
KafkaRecordEntity>
+public class KafkaRecordSupplier implements 
RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity>
 {
   private final KafkaConsumer<byte[], byte[]> consumer;
   private boolean closed;
 
+  private boolean multiTopic;
+
+  // Store the stream information when partitions get assigned. This is 
required because the consumer does not
+  // know about the parent stream which could be a list of topics.
+  private String stream;

Review Comment:
   Does this need to be `volatile`?
   Also, maybe convert the comment into a javadoc. It would probably be nice to 
call out that this would have the same value as the `topic` field in the 
supervisor spec. 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -162,31 +189,39 @@ public Long 
getEarliestSequenceNumber(StreamPartition<Integer> partition)
   }
 
   @Override
-  public boolean isOffsetAvailable(StreamPartition<Integer> partition, 
OrderedSequenceNumber<Long> offset)
+  public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition> 
partition, OrderedSequenceNumber<Long> offset)
   {
     final Long earliestOffset = getEarliestSequenceNumber(partition);
     return earliestOffset != null
            && 
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
   }
 
   @Override
-  public Long getPosition(StreamPartition<Integer> partition)
+  public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
   {
-    return wrapExceptions(() -> consumer.position(new TopicPartition(
-        partition.getStream(),
-        partition.getPartitionId()
-    )));
+    return wrapExceptions(() -> 
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
   }
 
   @Override
-  public Set<Integer> getPartitionIds(String stream)
+  public Set<KafkaTopicPartition> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      List<PartitionInfo> partitions = consumer.partitionsFor(stream);
-      if (partitions == null) {
-        throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", stream);
+      List<PartitionInfo> allPartitions = new ArrayList<>();
+      for (String topic : stream.split(",")) {
+        if (!multiTopic && !allPartitions.isEmpty()) {
+          throw InvalidInput.exception("Comma separated list of topics [%s] is 
not supported unless you enabled "
+                                       + "multiTopic in the 
KafkaSupervisorSpec.", stream);
+        }
+        List<PartitionInfo> partitions = consumer.partitionsFor(topic.trim());
+        if (partitions == null) {
+          throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", topic.trim());

Review Comment:
   Nit: rephrase
   ```suggestion
             throw new ISE("Could not find topic[%s] using Kafka consumer", 
topic.trim());
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -55,20 +57,29 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
-public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, 
KafkaRecordEntity>
+public class KafkaRecordSupplier implements 
RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity>
 {
   private final KafkaConsumer<byte[], byte[]> consumer;
   private boolean closed;
 
+  private boolean multiTopic;

Review Comment:
   can be final.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition 
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id 
can be used for different topics.
+ * This class is used as a key in {@link 
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+    keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using = 
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+    KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+  private final int partition;
+  @Nullable
+  private final String topic;
+
+  // This flag is used to maintain backward incompatibilty with older versions 
of kafka indexing. If this flag
+  // is set to false,
+  // - KafkaTopicPartition will be serialized as an integer and can be read 
back by older version.
+  // - topic field is ignored while comparing two KafkaTopicPartition objects 
and calculating hashcode.
+  // This flag must be explicitly passed while constructing 
KafkaTopicPartition object. That way, we can ensure that
+  // a particular supervisor is always running in multi topic mode or single 
topic mode.
+  private final boolean multiTopicPartition;
+
+  public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String 
topic, int partition)
+  {
+    this.partition = partition;
+    this.topic = topic;
+    this.multiTopicPartition = multiTopicPartition;
+  }
+
+  public int partition()
+  {
+    return partition;
+  }
+
+  public Optional<String> topic()
+  {
+    return Optional.ofNullable(topic);
+  }
+
+  public boolean isMultiTopicPartition()
+  {
+    return multiTopicPartition;
+  }
+
+  public TopicPartition asTopicPartition(String fallbackTopic)
+  {
+    return new TopicPartition(topic != null ? topic : fallbackTopic, 
partition);

Review Comment:
   Might be nice to have an if on `multiTopic`:
   ```suggestion
       if (multiTopic) {
         return new TopicPartition(topic, partition);
       } else {
         return new TopicPartition(topic != null ? topic : fallbackTopic, 
partition);
       }
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the partition id for kafka ingestion. This partition 
id includes topic name along with an
+ * integer partition. The topic name is required because the same partition id 
can be used for different topics.
+ * This class is used as a key in {@link 
org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets
+ * for each partition.
+ *
+ */
+@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class,
+    keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class)
+@JsonDeserialize(using = 
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
+    KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
+public class KafkaTopicPartition
+{
+  private final int partition;
+  @Nullable
+  private final String topic;
+
+  // This flag is used to maintain backward incompatibilty with older versions 
of kafka indexing. If this flag

Review Comment:
   +1 for making it a javadoc. Also the content needs updating given the 
current code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to