Repository: storm Updated Branches: refs/heads/1.x-branch 1deb9ab95 -> 262029b15
STORM-2407: KafkaTridentSpoutOpaque Doesn't Poll Data From All Topic-Partitions When Parallelism Hint Not a Multiple Total Topic-Partitions - Introduce logic to poll data from the topic partitions assigned to each task Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8885411 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8885411 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8885411 Branch: refs/heads/1.x-branch Commit: b88854110099edb778d5a906ff1f838737b673a3 Parents: 2a99f61 Author: Hugo Louro <hmclo...@gmail.com> Authored: Fri Mar 10 15:13:31 2017 -0600 Committer: Hugo Louro <hmclo...@gmail.com> Committed: Fri Mar 10 17:55:02 2017 -0600 ---------------------------------------------------------------------- .../trident/OpaqueTridentEventHubEmitter.java | 20 ++++- .../spout/trident/KafkaTridentSpoutEmitter.java | 94 +++++++------------- .../spout/trident/KafkaTridentSpoutOpaque.java | 8 +- .../kafka/trident/TridentKafkaEmitter.java | 19 ++-- .../spout/IOpaquePartitionedTridentSpout.java | 19 +++- .../OpaquePartitionedTridentSpoutExecutor.java | 15 ++-- .../topology/state/TransactionalState.java | 4 + 7 files changed, 89 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java index ae21ab3..20375a2 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java @@ -17,16 +17,16 @@ *******************************************************************************/ package org.apache.storm.eventhubs.trident; -import java.util.List; -import java.util.Map; - import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory; - import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.trident.topology.TransactionAttempt; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + /** * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout */ @@ -63,6 +63,18 @@ public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSp } @Override + public List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo) { + final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo); + final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size()); + if (orderedPartitions != null) { + for (int i = taskId; i < orderedPartitions.size(); i += numTasks) { + taskPartitions.add(orderedPartitions.get(i)); + } + } + return taskPartitions; + } + + @Override public void refreshPartitions(List<Partition> partitionList) { transactionalEmitter.refreshPartitions(partitionList); } http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index 79dfc60..8607853 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -71,7 +70,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident private TopologyContext topologyContext; - public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) { + public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) { this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext); this.kafkaManager = kafkaManager; this.topologyContext = topologyContext; @@ -87,14 +86,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident /** * Creates instance of this class with default 500 millisecond refresh subscription timer */ - public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext topologyContext) { + public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) { this(kafkaManager, topologyContext, new Timer(500, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS)); } @Override public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, - KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) { + KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) { LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, collector); @@ -105,10 +104,10 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet(); if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) { - LOG.warn("SKIPPING processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + - "[collector = {}] because it is not assigned {} to consumer instance [{}] of consumer group [{}]", - tx, currBatchPartition, lastBatch, collector, assignments, kafkaConsumer, - kafkaManager.getKafkaSpoutConfig().getConsumerGroupId()); + LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + + "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " + + "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments, + kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId()); } else { try { // pause other topic-partitions to only poll from current topic-partition @@ -205,67 +204,40 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident /** * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions * for this task must be assigned to the Kafka consumer running on this task. + * * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator} * @return ordered list of topic partitions for this task */ @Override public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<TopicPartition> allPartitionInfo) { - final int numTopicPartitions = allPartitionInfo == null ? 0 : allPartitionInfo.size(); - final int taskIndex = topologyContext.getThisTaskIndex(); - final int numTasks = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size(); - - LOG.debug("Computing task ordered list of topic-partitions from all partitions list {}, " + - "for task with index [{}] of total tasks [{}] ", allPartitionInfo, taskIndex, numTasks); - - final Set<TopicPartition> assignment = kafkaConsumer.assignment(); - LOG.debug("Consumer [{}] has assigned topic-partitions {}", kafkaConsumer, assignment); - - List<KafkaTridentSpoutTopicPartition> taskOrderedTps = new ArrayList<>(numTopicPartitions); - - if (numTopicPartitions > 0) { - final KafkaTridentSpoutTopicPartition[] tps = new KafkaTridentSpoutTopicPartition[numTopicPartitions]; - int tpTaskComputedIdx = taskIndex; - /* - * Put this task's Kafka consumer assigned topic-partitions in the right index locations such - * that distribution by OpaquePartitionedTridentSpoutExecutor can be done correctly. This algorithm - * does the distribution in exactly the same way as the one used in OpaquePartitionedTridentSpoutExecutor - */ - for (TopicPartition assignedTp : assignment) { - if (tpTaskComputedIdx >= numTopicPartitions) { - LOG.warn("Ignoring attempt to add consumer [{}] assigned topic-partition [{}] to index [{}], " + - "out of bounds [{}]. ", kafkaConsumer, assignedTp, tpTaskComputedIdx, numTopicPartitions); - break; - } - tps[tpTaskComputedIdx] = new KafkaTridentSpoutTopicPartition(assignedTp); - LOG.debug("Added consumer assigned topic-partition [{}] to position [{}] for task with index [{}]", - assignedTp, tpTaskComputedIdx, taskIndex); - tpTaskComputedIdx += numTasks; - } + final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allPartitionInfo); + LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", + allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); + return allPartitions; + } - // Put topic-partitions assigned to consumer instances running in different tasks in the empty slots - int i = 0; - for (TopicPartition tp : allPartitionInfo) { - /* - * Topic-partition not assigned to the Kafka consumer associated with this emitter task, hence not yet - * added to the list of task ordered partitions. To be processed next. - */ - if (!assignment.contains(tp)) { - for (; i < numTopicPartitions; i++) { - if (tps[i] == null) { // find empty slot to put the topic-partition - tps[i] = new KafkaTridentSpoutTopicPartition(tp); - LOG.debug("Added to position [{}] topic-partition [{}], which is assigned to a consumer " + - "running on a task other than task with index [{}] ", i, tp, taskIndex); - i++; - break; - } - } - } + @Override + public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, List<TopicPartition> allPartitionInfo) { + final Set<TopicPartition> assignedTps = kafkaConsumer.assignment(); + LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps); + final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps); + LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId); + return taskTps; + } + + private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) { + final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps == null ? 0 : tps.size()); + if (tps != null) { + for (TopicPartition tp : tps) { + LOG.trace("Added topic-partition [{}]", tp); + kttp.add(new KafkaTridentSpoutTopicPartition(tp)); } - taskOrderedTps = Arrays.asList(tps); } - LOG.debug("Returning ordered list of topic-partitions {} for task with index [{}], of total tasks [{}] ", - taskOrderedTps, taskIndex, numTasks); - return taskOrderedTps; + return kttp; + } + + private int getNumTasks() { + return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size(); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java index 18d37d9..0f7f0af 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java @@ -35,10 +35,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class); private final KafkaTridentSpoutManager<K, V> kafkaManager; - private KafkaTridentSpoutEmitter<K, V> kafkaTridentSpoutEmitter; - private KafkaTridentSpoutOpaqueCoordinator<K, V> coordinator; - public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) { this(new KafkaTridentSpoutManager<>(conf)); } @@ -73,9 +70,6 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp @Override public String toString() { return super.toString() + - "{kafkaManager=" + kafkaManager + - ", kafkaTridentSpoutEmitter=" + kafkaTridentSpoutEmitter + - ", coordinator=" + coordinator + - '}'; + "{kafkaManager=" + kafkaManager + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java index 136eb0b..2a407ca 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java @@ -161,11 +161,6 @@ public class TridentKafkaEmitter { /** * re-emit the batch described by the meta data provided - * - * @param attempt - * @param collector - * @param partition - * @param meta */ private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) { LOG.info("re-emitting batch, attempt " + attempt); @@ -177,7 +172,7 @@ public class TridentKafkaEmitter { ByteBufferMessageSet msgs = null; msgs = fetchMessages(consumer, partition, offset); - if(msgs != null) { + if (msgs != null) { for (MessageAndOffset msg : msgs) { if (offset == nextOffset) { break; @@ -253,6 +248,18 @@ public class TridentKafkaEmitter { } @Override + public List<Partition> getPartitionsForTask(int taskId, int numTasks, List<GlobalPartitionInformation> allPartitionInfo) { + final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo); + final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size()); + if (orderedPartitions != null) { + for (int i = taskId; i < orderedPartitions.size(); i += numTasks) { + taskPartitions.add(orderedPartitions.get(i)); + } + } + return taskPartitions; + } + + @Override public void close() { clear(); } http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java index fc4d5ea..67cb361 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java +++ b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java @@ -52,13 +52,26 @@ public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends IS * This method is called when this task is responsible for a new set of partitions. Should be used * to manage things like connections to brokers. */ - void refreshPartitions(List<Partition> partitionResponsibilities); + void refreshPartitions(List<Partition> partitionResponsibilities); + + /** + * @return The oredered list of partitions being processed by all the tasks + */ List<Partition> getOrderedPartitions(Partitions allPartitionInfo); + + /** + * @return The list of partitions that are to be processed by the task with id {@code taskId} + */ + List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo); + void close(); } - Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context); - Coordinator getCoordinator(Map conf, TopologyContext context); + Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context); + + Coordinator getCoordinator(Map conf, TopologyContext context); + Map<String, Object> getComponentConfiguration(); + Fields getOutputFields(); } http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java index ea66acd..a6eeff0 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java @@ -27,7 +27,6 @@ import org.apache.storm.tuple.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -111,16 +110,14 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS tx, coordinatorMeta, collector, this); if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) { - List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta); _partitionStates.clear(); - List<ISpoutPartition> myPartitions = new ArrayList<>(); - for(int i=_index; i < partitions.size(); i+=_numTasks) { - ISpoutPartition p = partitions.get(i); - String id = p.getId(); - myPartitions.add(p); - _partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p)); + final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta); + for (ISpoutPartition partition : taskPartitions) { + _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition)); } - _emitter.refreshPartitions(myPartitions); + + // refresh all partitions for backwards compatibility with old spout + _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta)); _savedCoordinatorMeta = coordinatorMeta; _changedMeta = true; } http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java index 8b63547..26ac404 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java @@ -37,6 +37,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * Class that contains the logic to extract the transactional state info from zookeeper. All transactional state + * is kept in zookeeper. This class only contains references to Curator, which is used to get all info from zookeeper. + */ public class TransactionalState { private static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class);