Repository: apex-malhar Updated Branches: refs/heads/master 255bc11c5 -> ef42c52a1
APEXMALHAR-2169 Removed the stuff related to dynamic partition based on load from AbstractKafkaInputOperator. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e4285484 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e4285484 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e4285484 Branch: refs/heads/master Commit: e4285484f98714436ed29210032c17ffd5a7d639 Parents: 1700725 Author: chaitanya <[email protected]> Authored: Sat Aug 20 18:10:02 2016 +0530 Committer: chaitanya <[email protected]> Committed: Sat Aug 20 18:44:40 2016 +0530 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 180 +------------------ 1 file changed, 5 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e4285484/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index d4945ec..9d2e664 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -155,10 +155,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem // By default the partition policy is 1:1 public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE; - // default resource is unlimited in terms of msgs per second + // Deprecated: Please don't use this property. + @Deprecated private long msgRateUpperBound = Long.MAX_VALUE; - // default resource is unlimited in terms of bytes per second + // Deprecated: Please don't use this property. + @Deprecated private long byteRateUpperBound = Long.MAX_VALUE; // Store the current operator partition topology @@ -601,46 +603,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem windowDataManager.partitioned(newManagers, deletedOperators); return partitions; } - else { - - logger.info("[ONE_TO_MANY]: Repartition the operator(s) under " + msgRateUpperBound + " msgs/s and " + byteRateUpperBound + " bytes/s hard limit"); - // size of the list depends on the load and capacity of each operator - newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(); - - // Use first-fit decreasing algorithm to minimize the container number and somewhat balance the partition - // try to balance the load and minimize the number of containers with each container's load under the threshold - // the partition based on the latest 1 minute moving average - Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>(); - // get the offset for all partitions of each consumer - Map<KafkaPartition, Long> offsetTrack = new HashMap<KafkaPartition, Long>(); - for (Partitioner.Partition<AbstractKafkaInputOperator<K>> partition : partitions) { - List<Stats.OperatorStats> opss = partition.getStats().getLastWindowedStats(); - if (opss == null || opss.size() == 0) { - continue; - } - offsetTrack.putAll(partition.getPartitionedInstance().consumer.getCurrentOffsets()); - // Get the latest stats - - Stats.OperatorStats stat = partition.getStats().getLastWindowedStats().get(partition.getStats().getLastWindowedStats().size() - 1); - if (stat.counters instanceof KafkaConsumer.KafkaMeterStats) { - KafkaConsumer.KafkaMeterStats kms = (KafkaConsumer.KafkaMeterStats) stat.counters; - kPIntakeRate.putAll(get_1minMovingAvgParMap(kms)); - } - } - - List<PartitionInfo> partitionInfos = firstFitDecreasingAlgo(kPIntakeRate); - - // Add the existing partition Ids to the deleted operators - for(Partitioner.Partition<AbstractKafkaInputOperator<K>> op : partitions) - { - deletedOperators.add(op.getPartitionedInstance().operatorId); - } - for (PartitionInfo r : partitionInfos) { - logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): " + StringUtils.join(r.kpids, ", ") + ", topic: " + this.getConsumer().topic); - newPartitions.add(createPartition(r.kpids, offsetTrack, newManagers)); - } - currentPartitionInfo.addAll(partitionInfos); - } break; case ONE_TO_MANY_HEURISTIC: @@ -674,54 +636,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem return p; } - private List<PartitionInfo> firstFitDecreasingAlgo(final Map<KafkaPartition, long[]> kPIntakeRate) - { - // (Decreasing) Sort the map by msgs/s and bytes/s in descending order - List<Map.Entry<KafkaPartition, long[]>> sortedMapEntry = new LinkedList<Map.Entry<KafkaPartition, long[]>>(kPIntakeRate.entrySet()); - Collections.sort(sortedMapEntry, new Comparator<Map.Entry<KafkaPartition, long[]>>() - { - @Override - public int compare(Map.Entry<KafkaPartition, long[]> firstEntry, Map.Entry<KafkaPartition, long[]> secondEntry) - { - long[] firstPair = firstEntry.getValue(); - long[] secondPair = secondEntry.getValue(); - if (msgRateUpperBound == Long.MAX_VALUE || firstPair[0] == secondPair[0]) { - return (int) (secondPair[1] - firstPair[1]); - } else { - return (int) (secondPair[0] - firstPair[0]); - } - } - }); - - // (First-fit) Look for first fit operator to assign the consumer - // Go over all the kafka partitions and look for the right operator to assign to - // Each record has a set of kafka partition ids and the resource left for that operator after assigned the consumers for those partitions - List<PartitionInfo> pif = new LinkedList<PartitionInfo>(); - outer: - for (Map.Entry<KafkaPartition, long[]> entry : sortedMapEntry) { - long[] resourceRequired = entry.getValue(); - for (PartitionInfo r : pif) { - if (r.msgRateLeft > resourceRequired[0] && r.byteRateLeft > resourceRequired[1]) { - // found first fit operator partition that has enough resource for this consumer - // add consumer to the operator partition - r.kpids.add(entry.getKey()); - // update the resource left in this partition - r.msgRateLeft -= r.msgRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[0]; - r.byteRateLeft -= r.byteRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[1]; - continue outer; - } - } - // didn't find the existing "operator" to assign this consumer - PartitionInfo nr = new PartitionInfo(); - nr.kpids = Sets.newHashSet(entry.getKey()); - nr.msgRateLeft = msgRateUpperBound == Long.MAX_VALUE ? msgRateUpperBound : msgRateUpperBound - resourceRequired[0]; - nr.byteRateLeft = byteRateUpperBound == Long.MAX_VALUE ? byteRateUpperBound : byteRateUpperBound - resourceRequired[1]; - pif.add(nr); - } - - return pif; - } - @Override public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) { @@ -830,97 +744,13 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } } - if (strategy == PartitionStrategy.ONE_TO_ONE) { - return false; - } - - // This is expensive part and only every repartitionCheckInterval it will check existing the overall partitions - // and see if there is more optimal solution - // The decision is made by 2 constraint - // Hard constraint which is upper bound overall msgs/s or bytes/s - // Soft constraint which is more optimal solution - - boolean b = breakHardConstraint(kstats) || breakSoftConstraint(); - if (b) { - currentPartitionInfo.clear(); - kafkaStatsHolder.clear(); - } - return b; + return false; } finally { // update last check time lastCheckTime = System.currentTimeMillis(); } } - /** - * Check to see if there is other more optimal(less partition) partition assignment based on current statistics - * - * @return True if all windowed stats indicate different partition size we need to adjust the partition. - */ - private boolean breakSoftConstraint() - { - if (kafkaStatsHolder.size() != currentPartitionInfo.size()) { - return false; - } - int length = kafkaStatsHolder.get(kafkaStatsHolder.keySet().iterator().next()).size(); - for (int j = 0; j < length; j++) { - Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>(); - for (Integer pid : kafkaStatsHolder.keySet()) { - if(kafkaStatsHolder.get(pid).size() <= j) - continue; - kPIntakeRate.putAll(get_1minMovingAvgParMap(kafkaStatsHolder.get(pid).get(j))); - } - if (kPIntakeRate.size() == 0) { - return false; - } - List<PartitionInfo> partitionInfo = firstFitDecreasingAlgo(kPIntakeRate); - if (partitionInfo.size() == 0 || partitionInfo.size() == currentPartitionInfo.size()) { - return false; - } - } - // if all windowed stats indicate different partition size we need to adjust the partition - return true; - } - - /** - * Check if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s - * - * @return True if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s. - */ - private boolean breakHardConstraint(List<KafkaConsumer.KafkaMeterStats> kmss) - { - // Only care about the KafkaMeterStats - - // if there is no kafka meter stats at all, don't repartition - if (kmss == null || kmss.size() == 0) { - return false; - } - // if all the stats within the window have msgs/s above the upper bound threshold (hard limit) - boolean needRP = Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>() - { - @Override - public boolean apply(KafkaConsumer.KafkaMeterStats kms) - { - // If there are more than 1 kafka partition and the total msg/s reach the limit - return kms.partitionStats.size() > 1 && kms.totalMsgPerSec > msgRateUpperBound; - } - }); - - // or all the stats within the window have bytes/s above the upper bound threshold (hard limit) - needRP = needRP || Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>() - { - @Override - public boolean apply(KafkaConsumer.KafkaMeterStats kms) - { - //If there are more than 1 kafka partition and the total bytes/s reach the limit - return kms.partitionStats.size() > 1 && kms.totalBytesPerSec > byteRateUpperBound; - } - }); - - return needRP; - - } - public static enum PartitionStrategy { /**
