Repository: storm
Updated Branches:
  refs/heads/1.x-branch 1deb9ab95 -> 262029b15


STORM-2407: KafkaTridentSpoutOpaque Doesn't Poll Data From All Topic-Partitions 
When Parallelism Hint Not a Multiple Total Topic-Partitions
 - Introduce logic to poll data from the topic partitions assigned to each  task


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8885411
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8885411
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8885411

Branch: refs/heads/1.x-branch
Commit: b88854110099edb778d5a906ff1f838737b673a3
Parents: 2a99f61
Author: Hugo Louro <hmclo...@gmail.com>
Authored: Fri Mar 10 15:13:31 2017 -0600
Committer: Hugo Louro <hmclo...@gmail.com>
Committed: Fri Mar 10 17:55:02 2017 -0600

----------------------------------------------------------------------
 .../trident/OpaqueTridentEventHubEmitter.java   | 20 ++++-
 .../spout/trident/KafkaTridentSpoutEmitter.java | 94 +++++++-------------
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  8 +-
 .../kafka/trident/TridentKafkaEmitter.java      | 19 ++--
 .../spout/IOpaquePartitionedTridentSpout.java   | 19 +++-
 .../OpaquePartitionedTridentSpoutExecutor.java  | 15 ++--
 .../topology/state/TransactionalState.java      |  4 +
 7 files changed, 89 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
index ae21ab3..20375a2 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -17,16 +17,16 @@
  
*******************************************************************************/
 package org.apache.storm.eventhubs.trident;
 
-import java.util.List;
-import java.util.Map;
-
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.trident.topology.TransactionAttempt;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 /**
  * A thin wrapper of TransactionalTridentEventHubEmitter for 
OpaqueTridentEventHubSpout
  */
@@ -63,6 +63,18 @@ public class OpaqueTridentEventHubEmitter implements 
IOpaquePartitionedTridentSp
   }
 
   @Override
+  public List<Partition> getPartitionsForTask(int taskId, int numTasks, 
Partitions allPartitionInfo) {
+    final List<Partition> orderedPartitions = 
getOrderedPartitions(allPartitionInfo);
+    final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions 
== null ? 0 : orderedPartitions.size());
+    if (orderedPartitions != null) {
+      for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
+        taskPartitions.add(orderedPartitions.get(i));
+      }
+    }
+    return taskPartitions;
+  }
+
+  @Override
   public void refreshPartitions(List<Partition> partitionList) {
     transactionalEmitter.refreshPartitions(partitionList);
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 79dfc60..8607853 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -71,7 +70,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
 
     private TopologyContext topologyContext;
 
-    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> 
kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> 
kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
         this.kafkaConsumer = 
kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
         this.kafkaManager = kafkaManager;
         this.topologyContext = topologyContext;
@@ -87,14 +86,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
     /**
      * Creates instance of this class with default 500 millisecond refresh 
subscription timer
      */
-    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> 
kafkaManager, TopologyContext topologyContext) {
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> 
kafkaManager, TopologyContext topologyContext) {
         this(kafkaManager, topologyContext, new Timer(500,
                 
kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), 
TimeUnit.MILLISECONDS));
     }
 
     @Override
     public KafkaTridentSpoutBatchMetadata<K, V> 
emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
-            KafkaTridentSpoutTopicPartition currBatchPartition, 
KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
+                                                                   
KafkaTridentSpoutTopicPartition currBatchPartition, 
KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
 
         LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = 
{}], [lastBatchMetadata = {}], [collector = {}]",
                 tx, currBatchPartition, lastBatch, collector);
@@ -105,10 +104,10 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
         Collection<TopicPartition> pausedTopicPartitions = 
Collections.emptySet();
 
         if (assignments == null || 
!assignments.contains(currBatchPartition.getTopicPartition())) {
-            LOG.warn("SKIPPING processing batch: [transaction = {}], 
[currBatchPartition = {}], [lastBatchMetadata = {}], " +
-                            "[collector = {}] because it is not assigned {} to 
consumer instance [{}] of consumer group [{}]",
-                    tx, currBatchPartition, lastBatch, collector, assignments, 
kafkaConsumer,
-                    kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
+            LOG.warn("SKIPPING processing batch [transaction = {}], 
[currBatchPartition = {}], [lastBatchMetadata = {}], " +
+                            "[collector = {}] because it is not part of the 
assignments {} of consumer instance [{}] " +
+                            "of consumer group [{}]", tx, currBatchPartition, 
lastBatch, collector, assignments,
+                    kafkaConsumer, 
kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
         } else {
             try {
                 // pause other topic-partitions to only poll from current 
topic-partition
@@ -205,67 +204,40 @@ public class KafkaTridentSpoutEmitter<K, V> implements 
IOpaquePartitionedTrident
     /**
      * Computes ordered list of topic-partitions for this task taking into 
consideration that topic-partitions
      * for this task must be assigned to the Kafka consumer running on this 
task.
+     *
      * @param allPartitionInfo list of all partitions as returned by {@link 
KafkaTridentSpoutOpaqueCoordinator}
      * @return ordered list of topic partitions for this task
      */
     @Override
     public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final 
List<TopicPartition> allPartitionInfo) {
-        final int numTopicPartitions = allPartitionInfo == null ? 0 : 
allPartitionInfo.size();
-        final int taskIndex = topologyContext.getThisTaskIndex();
-        final int numTasks = 
topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
-
-        LOG.debug("Computing task ordered list of topic-partitions from all 
partitions list {}, " +
-                "for task with index [{}] of total tasks [{}] ", 
allPartitionInfo, taskIndex, numTasks);
-
-        final Set<TopicPartition> assignment = kafkaConsumer.assignment();
-        LOG.debug("Consumer [{}] has assigned topic-partitions {}", 
kafkaConsumer, assignment);
-
-        List<KafkaTridentSpoutTopicPartition> taskOrderedTps = new 
ArrayList<>(numTopicPartitions);
-
-        if (numTopicPartitions > 0) {
-            final KafkaTridentSpoutTopicPartition[] tps = new 
KafkaTridentSpoutTopicPartition[numTopicPartitions];
-            int tpTaskComputedIdx = taskIndex;
-            /*
-             * Put this task's Kafka consumer assigned topic-partitions in the 
right index locations such
-             * that distribution by OpaquePartitionedTridentSpoutExecutor can 
be done correctly. This algorithm
-             * does the distribution in exactly the same way as the one used 
in OpaquePartitionedTridentSpoutExecutor
-             */
-            for (TopicPartition assignedTp : assignment) {
-                if (tpTaskComputedIdx >= numTopicPartitions) {
-                    LOG.warn("Ignoring attempt to add consumer [{}] assigned 
topic-partition [{}] to index [{}], " +
-                            "out of bounds [{}]. ", kafkaConsumer, assignedTp, 
tpTaskComputedIdx, numTopicPartitions);
-                    break;
-                }
-                tps[tpTaskComputedIdx] = new 
KafkaTridentSpoutTopicPartition(assignedTp);
-                LOG.debug("Added consumer assigned topic-partition [{}] to 
position [{}] for task with index [{}]",
-                        assignedTp, tpTaskComputedIdx, taskIndex);
-                tpTaskComputedIdx += numTasks;
-            }
+        final List<KafkaTridentSpoutTopicPartition> allPartitions = 
newKafkaTridentSpoutTopicPartitions(allPartitionInfo);
+        LOG.debug("Returning all topic-partitions {} across all tasks. Current 
task index [{}]. Total tasks [{}] ",
+                allPartitions, topologyContext.getThisTaskIndex(), 
getNumTasks());
+        return allPartitions;
+    }
 
-            // Put topic-partitions assigned to consumer instances running in 
different tasks in the empty slots
-            int i = 0;
-            for (TopicPartition tp : allPartitionInfo) {
-                /*
-                 * Topic-partition not assigned to the Kafka consumer 
associated with this emitter task, hence not yet
-                 * added to the list of task ordered partitions. To be 
processed next.
-                 */
-                if (!assignment.contains(tp)) {
-                    for (; i < numTopicPartitions; i++) {
-                        if (tps[i] == null) {   // find empty slot to put the 
topic-partition
-                            tps[i] = new KafkaTridentSpoutTopicPartition(tp);
-                            LOG.debug("Added to position [{}] topic-partition 
[{}], which is assigned to a consumer " +
-                                    "running on a task other than task with 
index [{}] ", i, tp, taskIndex);
-                            i++;
-                            break;
-                        }
-                    }
-                }
+    @Override
+    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int 
taskId, int numTasks, List<TopicPartition> allPartitionInfo) {
+        final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
+        LOG.debug("Consumer [{}], running on task with index [{}], has 
assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
+        final List<KafkaTridentSpoutTopicPartition> taskTps = 
newKafkaTridentSpoutTopicPartitions(assignedTps);
+        LOG.debug("Returning topic-partitions {} for task with index [{}]", 
taskTps, taskId);
+        return taskTps;
+    }
+
+    private List<KafkaTridentSpoutTopicPartition> 
newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
+        final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps 
== null ? 0 : tps.size());
+        if (tps != null) {
+            for (TopicPartition tp : tps) {
+                LOG.trace("Added topic-partition [{}]", tp);
+                kttp.add(new KafkaTridentSpoutTopicPartition(tp));
             }
-            taskOrderedTps = Arrays.asList(tps);
         }
-        LOG.debug("Returning ordered list of topic-partitions {} for task with 
index [{}], of total tasks [{}] ",
-                taskOrderedTps, taskIndex, numTasks);
-        return taskOrderedTps;
+        return kttp;
+    }
+
+    private int getNumTasks() {
+        return 
topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 18d37d9..0f7f0af 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -35,10 +35,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements 
IOpaquePartitionedTridentSp
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
 
     private final KafkaTridentSpoutManager<K, V> kafkaManager;
-    private KafkaTridentSpoutEmitter<K, V> kafkaTridentSpoutEmitter;
-    private KafkaTridentSpoutOpaqueCoordinator<K, V> coordinator;
 
-    
     public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
         this(new KafkaTridentSpoutManager<>(conf));
     }
@@ -73,9 +70,6 @@ public class KafkaTridentSpoutOpaque<K,V> implements 
IOpaquePartitionedTridentSp
     @Override
     public String toString() {
         return super.toString() +
-                "{kafkaManager=" + kafkaManager +
-                ", kafkaTridentSpoutEmitter=" + kafkaTridentSpoutEmitter +
-                ", coordinator=" + coordinator +
-                '}';
+                "{kafkaManager=" + kafkaManager + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 136eb0b..2a407ca 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -161,11 +161,6 @@ public class TridentKafkaEmitter {
 
     /**
      * re-emit the batch described by the meta data provided
-     *
-     * @param attempt
-     * @param collector
-     * @param partition
-     * @param meta
      */
     private void reEmitPartitionBatch(TransactionAttempt attempt, 
TridentCollector collector, Partition partition, Map meta) {
         LOG.info("re-emitting batch, attempt " + attempt);
@@ -177,7 +172,7 @@ public class TridentKafkaEmitter {
             ByteBufferMessageSet msgs = null;
             msgs = fetchMessages(consumer, partition, offset);
 
-            if(msgs != null) {
+            if (msgs != null) {
                 for (MessageAndOffset msg : msgs) {
                     if (offset == nextOffset) {
                         break;
@@ -253,6 +248,18 @@ public class TridentKafkaEmitter {
             }
 
             @Override
+            public List<Partition> getPartitionsForTask(int taskId, int 
numTasks, List<GlobalPartitionInformation> allPartitionInfo) {
+                final List<Partition> orderedPartitions = 
getOrderedPartitions(allPartitionInfo);
+                final List<Partition> taskPartitions = new 
ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size());
+                if (orderedPartitions != null) {
+                    for (int i = taskId; i < orderedPartitions.size(); i += 
numTasks) {
+                        taskPartitions.add(orderedPartitions.get(i));
+                    }
+                }
+                return taskPartitions;
+            }
+
+            @Override
             public void close() {
                 clear();
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
 
b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index fc4d5ea..67cb361 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -52,13 +52,26 @@ public interface IOpaquePartitionedTridentSpout<Partitions, 
Partition extends IS
          * This method is called when this task is responsible for a new set 
of partitions. Should be used
          * to manage things like connections to brokers.
          */        
-        void refreshPartitions(List<Partition> partitionResponsibilities);     
   
+        void refreshPartitions(List<Partition> partitionResponsibilities);
+
+        /**
+         * @return The oredered list of partitions being processed by all the 
tasks
+         */
         List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
+
+        /**
+         * @return The list of partitions that are to be processed by the task 
with id {@code taskId}
+         */
+        List<Partition> getPartitionsForTask(int taskId, int numTasks, 
Partitions allPartitionInfo);
+
         void close();
     }
     
-    Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext 
context);     
-    Coordinator getCoordinator(Map conf, TopologyContext context);     
+    Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext 
context);
+
+    Coordinator getCoordinator(Map conf, TopologyContext context);
+
     Map<String, Object> getComponentConfiguration();
+
     Fields getOutputFields();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
 
b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index ea66acd..a6eeff0 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -27,7 +27,6 @@ import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -111,16 +110,14 @@ public class OpaquePartitionedTridentSpoutExecutor 
implements ICommitterTridentS
                     tx, coordinatorMeta, collector, this);
 
             if(_savedCoordinatorMeta==null || 
!_savedCoordinatorMeta.equals(coordinatorMeta)) {
-                List<ISpoutPartition> partitions = 
_emitter.getOrderedPartitions(coordinatorMeta);
                 _partitionStates.clear();
-                List<ISpoutPartition> myPartitions = new ArrayList<>();
-                for(int i=_index; i < partitions.size(); i+=_numTasks) {
-                    ISpoutPartition p = partitions.get(i);
-                    String id = p.getId();
-                    myPartitions.add(p);
-                    _partitionStates.put(id, new EmitterPartitionState(new 
RotatingTransactionalState(_state, id), p));
+                final List<ISpoutPartition> taskPartitions = 
_emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
+                for (ISpoutPartition partition : taskPartitions) {
+                    _partitionStates.put(partition.getId(), new 
EmitterPartitionState(new RotatingTransactionalState(_state, 
partition.getId()), partition));
                 }
-                _emitter.refreshPartitions(myPartitions);
+
+                // refresh all partitions for backwards compatibility with old 
spout
+                
_emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                 _savedCoordinatorMeta = coordinatorMeta;
                 _changedMeta = true;
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/b8885411/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 8b63547..26ac404 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -37,6 +37,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Class that contains the logic to extract the transactional state info from 
zookeeper. All transactional state
+ * is kept in zookeeper. This class only contains references to Curator, which 
is used to get all info from zookeeper.
+ */
 public class TransactionalState {
     private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalState.class);
 

Reply via email to