APEXMALHAR-2180 Kafka partitions to be unchanged in the case of dynamic scalling of ONE_TO_MANY strategy
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/37fa01e7 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/37fa01e7 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/37fa01e7 Branch: refs/heads/master Commit: 37fa01e721da2fc1067f8e762ac703a0eff48ea2 Parents: e428548 Author: chaitanya <[email protected]> Authored: Tue Aug 23 00:07:51 2016 +0530 Committer: chaitanya <[email protected]> Committed: Tue Aug 23 00:07:51 2016 +0530 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 52 +++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37fa01e7/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index 9d2e664..21ff181 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -21,8 +21,8 @@ package com.datatorrent.contrib.kafka; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.ActivationListener; -import com.datatorrent.api.Operator.CheckpointListener; import com.datatorrent.api.Partitioner; import com.datatorrent.api.Stats; import com.datatorrent.api.StatsListener; @@ -30,8 +30,6 @@ import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.util.KryoCloneUtils; import com.google.common.base.Joiner; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -59,8 +57,6 @@ import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -70,7 +66,6 @@ import java.util.Map; import java.util.Set; import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions; -import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap; /** * This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus. @@ -129,7 +124,7 @@ import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.ge */ @OperatorAnnotation(partitionable = true) -public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, CheckpointListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener +public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, Operator.CheckpointNotificationListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener { private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class); @@ -375,6 +370,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } @Override + public void beforeCheckpoint(long windowId) + { + + } + + @Override public void committed(long windowId) { if ((getConsumer() instanceof SimpleKafkaConsumer)) { @@ -510,11 +511,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem isInitialParitition = partitions.iterator().next().getStats() == null; } - // get partition metadata for topics. - // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic - // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata - Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic()); - // Operator partitions List<Partitioner.Partition<AbstractKafkaInputOperator<K>>> newPartitions = null; @@ -537,6 +533,10 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem if (isInitialParitition) { lastRepartitionTime = System.currentTimeMillis(); logger.info("[ONE_TO_ONE]: Initializing partition(s)"); + // get partition metadata for topics. + // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic + // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata + Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic()); // initialize the number of operator partitions according to number of kafka partitions @@ -548,7 +548,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset, newManagers)); } } - + windowDataManager.partitioned(newManagers, deletedOperators); + return newPartitions; } else if (newWaitingPartition.size() != 0) { // add partition for new kafka partition @@ -557,9 +558,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers)); } newWaitingPartition.clear(); - windowDataManager.partitioned(newManagers, deletedOperators); - return partitions; - } break; // For the 1 to N mapping The initial partition number is defined by stream application @@ -571,9 +569,14 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy."); } - if (isInitialParitition) { + if (isInitialParitition || newWaitingPartition.size() != 0) { lastRepartitionTime = System.currentTimeMillis(); logger.info("[ONE_TO_MANY]: Initializing partition(s)"); + // get partition metadata for topics. + // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic + // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata + Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic()); + int size = initialPartitionCount; @SuppressWarnings("unchecked") Set<KafkaPartition>[] kps = (Set<KafkaPartition>[]) Array.newInstance((new HashSet<KafkaPartition>()).getClass(), size); @@ -594,14 +597,15 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", StringUtils.join(kps[i], ", ")); newPartitions.add(createPartition(kps[i], initOffset, newManagers)); } + // Add the existing partition Ids to the deleted operators + for (Partition<AbstractKafkaInputOperator<K>> op : partitions) + { + deletedOperators.add(op.getPartitionedInstance().operatorId); + } - } - else if (newWaitingPartition.size() != 0) { - - logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): {} ", StringUtils.join(newWaitingPartition, ", ")); - partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), null, newManagers)); + newWaitingPartition.clear(); windowDataManager.partitioned(newManagers, deletedOperators); - return partitions; + return newPartitions; } break; @@ -612,7 +616,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } windowDataManager.partitioned(newManagers, deletedOperators); - return newPartitions; + return partitions; } // Create a new partition with the partition Ids and initial offset positions
