abhishekrb19 commented on code in PR #14424: URL: https://github.com/apache/druid/pull/14424#discussion_r1240581189
########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kafka; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.kafka.common.TopicPartition; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +/** + * This class represents the partition id for kafka ingestion. This partition id includes topic name along with an + * integer partition. The topic name is required because the same partition id can be used for different topics. + * This class is used as a key in {@link org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets + * for each partition. + * + */ +@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class, + keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class) +@JsonDeserialize(using = KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing = + KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class) +public class KafkaTopicPartition +{ + private final int partition; + @Nullable + private final String topic; + + // This flag is used to maintain backward incompatibilty with older versions of kafka indexing. If this flag Review Comment: typo: incompatibilty -> incompatibility Also, I think this could straight-up be a javadoc for the property instead of a multi-line comment. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kafka; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.kafka.common.TopicPartition; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +/** + * This class represents the partition id for kafka ingestion. This partition id includes topic name along with an + * integer partition. The topic name is required because the same partition id can be used for different topics. + * This class is used as a key in {@link org.apache.druid.indexing.kafka.KafkaDataSourceMetadata} to store the offsets + * for each partition. + * + */ +@JsonSerialize(using = KafkaTopicPartition.KafkaTopicPartitionSerializer.class, + keyUsing = KafkaTopicPartition.KafkaTopicPartitionSerializer.class) +@JsonDeserialize(using = KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing = + KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class) +public class KafkaTopicPartition +{ + private final int partition; + @Nullable + private final String topic; + + // This flag is used to maintain backward incompatibilty with older versions of kafka indexing. If this flag + // is set to false, + // - KafkaTopicPartition will be serialized as an integer and can be read back by older version. + // - topic field is ignored while comparing two KafkaTopicPartition objects and calculating hashcode. + // This flag must be explicitly passed while constructing KafkaTopicPartition object. That way, we can ensure that + // a particular supervisor is always running in multi topic mode or single topic mode. + private final boolean multiTopicPartition; + + public KafkaTopicPartition(boolean multiTopicPartition, @Nullable String topic, int partition) + { + this.partition = partition; + this.topic = topic; + this.multiTopicPartition = multiTopicPartition; + } + + public int partition() + { + return partition; + } + + public Optional<String> topic() + { + return Optional.ofNullable(topic); + } + + public boolean isMultiTopicPartition() + { + return multiTopicPartition; + } + + public TopicPartition asTopicPartition(String fallbackTopic) + { + return new TopicPartition(topic != null ? topic : fallbackTopic, partition); + } + + @Override + public String toString() + { + // TODO - fix this so toString is not used for serialization + if (null != topic && multiTopicPartition) { + return partition + ":" + topic; Review Comment: Since the standard convention is `topic:partition`, I'm wondering why we have the inverse here? ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kafka; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.KeyDeserializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.kafka.common.TopicPartition; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +/** + * This class represents the partition id for kafka ingestion. This partition id includes topic name along with an + * integer partition. The topic name is required because the same partition id can be used for different topics. Review Comment: I think "The topic name is required" holds true only for the new mode, and in the old mode, the topic name can be optional? I'd suggest clarifying the javadoc and perhaps add a check to validate this constraint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
