STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: backport 
trident interface changes

Cherry-picked from commit SHA b8885411.  Except that we had to ditch all 
changes to storm-kafka-client because b8885411 was done earlier than 74ca795 
where we copied storm-kafka-client from, and so there were conflicting changes 
that would have been regressions and broken the storm-kafka-client code.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4492b3e2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4492b3e2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4492b3e2

Branch: refs/heads/1.0.x-branch
Commit: 4492b3e282574c18074baa374bfbab042835d27d
Parents: 1312d69
Author: Hugo Louro <hmclo...@gmail.com>
Authored: Fri Mar 10 15:13:31 2017 -0600
Committer: Erik Weathers <eri...@gmail.com>
Committed: Wed Feb 7 18:43:39 2018 -0800

----------------------------------------------------------------------
 .../trident/OpaqueTridentEventHubEmitter.java   | 20 ++++++++++++++++----
 .../kafka/trident/TridentKafkaEmitter.java      | 19 +++++++++++++------
 .../spout/IOpaquePartitionedTridentSpout.java   | 19 ++++++++++++++++---
 .../OpaquePartitionedTridentSpoutExecutor.java  | 16 +++++++---------
 .../topology/state/TransactionalState.java      |  4 ++++
 5 files changed, 56 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
index ae21ab3..20375a2 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -17,16 +17,16 @@
  
*******************************************************************************/
 package org.apache.storm.eventhubs.trident;
 
-import java.util.List;
-import java.util.Map;
-
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.trident.topology.TransactionAttempt;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 /**
  * A thin wrapper of TransactionalTridentEventHubEmitter for 
OpaqueTridentEventHubSpout
  */
@@ -63,6 +63,18 @@ public class OpaqueTridentEventHubEmitter implements 
IOpaquePartitionedTridentSp
   }
 
   @Override
+  public List<Partition> getPartitionsForTask(int taskId, int numTasks, 
Partitions allPartitionInfo) {
+    final List<Partition> orderedPartitions = 
getOrderedPartitions(allPartitionInfo);
+    final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions 
== null ? 0 : orderedPartitions.size());
+    if (orderedPartitions != null) {
+      for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
+        taskPartitions.add(orderedPartitions.get(i));
+      }
+    }
+    return taskPartitions;
+  }
+
+  @Override
   public void refreshPartitions(List<Partition> partitionList) {
     transactionalEmitter.refreshPartitions(partitionList);
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 809ed73..50cae21 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -148,11 +148,6 @@ public class TridentKafkaEmitter {
 
     /**
      * re-emit the batch described by the meta data provided
-     *
-     * @param attempt
-     * @param collector
-     * @param partition
-     * @param meta
      */
     private void reEmitPartitionBatch(TransactionAttempt attempt, 
TridentCollector collector, Partition partition, Map meta) {
         LOG.info("re-emitting batch, attempt " + attempt);
@@ -164,7 +159,7 @@ public class TridentKafkaEmitter {
             ByteBufferMessageSet msgs = null;
             msgs = fetchMessages(consumer, partition, offset);
 
-            if(msgs != null) {
+            if (msgs != null) {
                 for (MessageAndOffset msg : msgs) {
                     if (offset == nextOffset) {
                         break;
@@ -237,6 +232,18 @@ public class TridentKafkaEmitter {
             }
 
             @Override
+            public List<Partition> getPartitionsForTask(int taskId, int 
numTasks, List<GlobalPartitionInformation> allPartitionInfo) {
+                final List<Partition> orderedPartitions = 
getOrderedPartitions(allPartitionInfo);
+                final List<Partition> taskPartitions = new 
ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size());
+                if (orderedPartitions != null) {
+                    for (int i = taskId; i < orderedPartitions.size(); i += 
numTasks) {
+                        taskPartitions.add(orderedPartitions.get(i));
+                    }
+                }
+                return taskPartitions;
+            }
+
+            @Override
             public void close() {
                 clear();
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
 
b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index bf4c093..50afa69 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -51,13 +51,26 @@ public interface IOpaquePartitionedTridentSpout<Partitions, 
Partition extends IS
          * This method is called when this task is responsible for a new set 
of partitions. Should be used
          * to manage things like connections to brokers.
          */        
-        void refreshPartitions(List<Partition> partitionResponsibilities);     
   
+        void refreshPartitions(List<Partition> partitionResponsibilities);
+
+        /**
+         * @return The oredered list of partitions being processed by all the 
tasks
+         */
         List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
+
+        /**
+         * @return The list of partitions that are to be processed by the task 
with id {@code taskId}
+         */
+        List<Partition> getPartitionsForTask(int taskId, int numTasks, 
Partitions allPartitionInfo);
+
         void close();
     }
     
-    Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext 
context);     
-    Coordinator getCoordinator(Map conf, TopologyContext context);     
+    Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext 
context);
+
+    Coordinator getCoordinator(Map conf, TopologyContext context);
+
     Map<String, Object> getComponentConfiguration();
+
     Fields getOutputFields();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
 
b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index 7c43961..ea04659 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -20,7 +20,7 @@ package org.apache.storm.trident.spout;
 
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Fields;
-import java.util.ArrayList;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -95,16 +95,14 @@ public class OpaquePartitionedTridentSpoutExecutor 
implements ICommitterTridentS
         @Override
         public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, 
TridentCollector collector) {
             if(_savedCoordinatorMeta==null || 
!_savedCoordinatorMeta.equals(coordinatorMeta)) {
-                List<ISpoutPartition> partitions = 
_emitter.getOrderedPartitions(coordinatorMeta);
                 _partitionStates.clear();
-                List<ISpoutPartition> myPartitions = new ArrayList<>();
-                for(int i=_index; i < partitions.size(); i+=_numTasks) {
-                    ISpoutPartition p = partitions.get(i);
-                    String id = p.getId();
-                    myPartitions.add(p);
-                    _partitionStates.put(id, new EmitterPartitionState(new 
RotatingTransactionalState(_state, id), p));
+                final List<ISpoutPartition> taskPartitions = 
_emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
+                for (ISpoutPartition partition : taskPartitions) {
+                    _partitionStates.put(partition.getId(), new 
EmitterPartitionState(new RotatingTransactionalState(_state, 
partition.getId()), partition));
                 }
-                _emitter.refreshPartitions(myPartitions);
+
+                // refresh all partitions for backwards compatibility with old 
spout
+                
_emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                 _savedCoordinatorMeta = coordinatorMeta;
                 _changedMeta = true;
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 5061590..b5546c9 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -40,6 +40,10 @@ import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Class that contains the logic to extract the transactional state info from 
zookeeper. All transactional state
+ * is kept in zookeeper. This class only contains references to Curator, which 
is used to get all info from zookeeper.
+ */
 public class TransactionalState {
     private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalState.class);
     CuratorFramework _curator;

Reply via email to