b-slim commented on a change in pull request #6431: Add Kinesis Indexing
Service to core Druid
URL: https://github.com/apache/incubator-druid/pull/6431#discussion_r243471474
##########
File path:
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java
##########
@@ -21,133 +21,28 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.indexing.overlord.DataSourceMetadata;
-import org.apache.druid.java.util.common.IAE;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
-public class KafkaDataSourceMetadata implements DataSourceMetadata
+public class KafkaDataSourceMetadata extends
SeekableStreamDataSourceMetadata<Integer, Long>
{
- private final KafkaPartitions kafkaPartitions;
@JsonCreator
public KafkaDataSourceMetadata(
- @JsonProperty("partitions") KafkaPartitions kafkaPartitions
+ @JsonProperty("partitions") SeekableStreamPartitions<Integer, Long>
kafkaPartitions
)
{
- this.kafkaPartitions = kafkaPartitions;
- }
-
- @JsonProperty("partitions")
- public KafkaPartitions getKafkaPartitions()
- {
- return kafkaPartitions;
- }
-
- @Override
- public boolean isValidStart()
- {
- return true;
- }
-
- @Override
- public boolean matches(DataSourceMetadata other)
- {
- if (getClass() != other.getClass()) {
- return false;
- }
-
- return plus(other).equals(other.plus(this));
- }
-
- @Override
- public DataSourceMetadata plus(DataSourceMetadata other)
- {
- if (!(other instanceof KafkaDataSourceMetadata)) {
- throw new IAE(
- "Expected instance of %s, got %s",
- KafkaDataSourceMetadata.class.getCanonicalName(),
- other.getClass().getCanonicalName()
- );
- }
-
- final KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) other;
-
- if
(that.getKafkaPartitions().getTopic().equals(kafkaPartitions.getTopic())) {
- // Same topic, merge offsets.
- final Map<Integer, Long> newMap = new HashMap<>();
-
- for (Map.Entry<Integer, Long> entry :
kafkaPartitions.getPartitionOffsetMap().entrySet()) {
- newMap.put(entry.getKey(), entry.getValue());
- }
-
- for (Map.Entry<Integer, Long> entry :
that.getKafkaPartitions().getPartitionOffsetMap().entrySet()) {
- newMap.put(entry.getKey(), entry.getValue());
- }
-
- return new KafkaDataSourceMetadata(new
KafkaPartitions(kafkaPartitions.getTopic(), newMap));
- } else {
- // Different topic, prefer "other".
- return other;
- }
- }
-
- @Override
- public DataSourceMetadata minus(DataSourceMetadata other)
- {
- if (!(other instanceof KafkaDataSourceMetadata)) {
- throw new IAE(
- "Expected instance of %s, got %s",
- KafkaDataSourceMetadata.class.getCanonicalName(),
- other.getClass().getCanonicalName()
- );
- }
-
- final KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) other;
-
- if
(that.getKafkaPartitions().getTopic().equals(kafkaPartitions.getTopic())) {
- // Same topic, remove partitions present in "that" from "this"
- final Map<Integer, Long> newMap = new HashMap<>();
-
- for (Map.Entry<Integer, Long> entry :
kafkaPartitions.getPartitionOffsetMap().entrySet()) {
- if
(!that.getKafkaPartitions().getPartitionOffsetMap().containsKey(entry.getKey()))
{
- newMap.put(entry.getKey(), entry.getValue());
- }
- }
-
- return new KafkaDataSourceMetadata(new
KafkaPartitions(kafkaPartitions.getTopic(), newMap));
- } else {
- // Different topic, prefer "this".
- return this;
- }
+ super(kafkaPartitions);
}
@Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) o;
- return Objects.equals(kafkaPartitions, that.kafkaPartitions);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(kafkaPartitions);
- }
-
- @Override
- public String toString()
+ protected SeekableStreamDataSourceMetadata<Integer, Long>
createConcreteDataSourceMetaData(
Review comment:
what does concrete adds to the meaning here ? is there a non concrete
DatasourceMetadata?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]