Repository: storm
Updated Branches:
  refs/heads/1.x-branch 31954307f -> 6aa64ea38


STORM-2642: Storm-kafka-client spout cannot be serialized when using manual 
partition assignment


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c10f9296
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c10f9296
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c10f9296

Branch: refs/heads/1.x-branch
Commit: c10f9296e92cb381596342d5e8b316425bab6e1e
Parents: 3195430
Author: Stig Rohde Døssing <[email protected]>
Authored: Wed Jul 19 15:25:18 2017 +0200
Committer: Stig Rohde Døssing <[email protected]>
Committed: Thu Jul 20 23:04:42 2017 +0200

----------------------------------------------------------------------
 .../storm/kafka/spout/ManualPartitionSubscription.java       | 8 ++++----
 .../java/org/apache/storm/kafka/spout/ManualPartitioner.java | 3 ++-
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c10f9296/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
index 7ddd8a7..61b98a8 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
@@ -31,10 +31,10 @@ public class ManualPartitionSubscription extends 
Subscription {
     private static final long serialVersionUID = 5633018073527583826L;
     private final ManualPartitioner partitioner;
     private final TopicFilter partitionFilter;
-    private Set<TopicPartition> currentAssignment = null;
-    private KafkaConsumer<?, ?> consumer = null;
-    private ConsumerRebalanceListener listener = null;
-    private TopologyContext context = null;
+    private transient Set<TopicPartition> currentAssignment = null;
+    private transient KafkaConsumer<?, ?> consumer = null;
+    private transient ConsumerRebalanceListener listener = null;
+    private transient TopologyContext context = null;
 
     public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter 
partitionFilter) {
         this.partitionFilter = partitionFilter;

http://git-wip-us.apache.org/repos/asf/storm/blob/c10f9296/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
index f4034ad..0abd6c8 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
@@ -19,6 +19,7 @@ package org.apache.storm.kafka.spout;
 
 import java.util.List;
 
+import java.io.Serializable;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.task.TopologyContext;
 
@@ -28,7 +29,7 @@ import org.apache.storm.task.TopologyContext;
  * The complete TopologyContext is passed in, but it is suggested that you use 
the index of the spout and the total
  * number of spouts to avoid missing partitions or double assigning partitions.
  */
-public interface ManualPartitioner {
+public interface ManualPartitioner extends Serializable {
     /**
      * Get the partitions for this assignment
      * @param allPartitions all of the partitions that the set of spouts want 
to subscribe to, in a strict ordering

Reply via email to