Repository: storm Updated Branches: refs/heads/1.x-branch 31954307f -> 6aa64ea38
STORM-2642: Storm-kafka-client spout cannot be serialized when using manual partition assignment Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c10f9296 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c10f9296 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c10f9296 Branch: refs/heads/1.x-branch Commit: c10f9296e92cb381596342d5e8b316425bab6e1e Parents: 3195430 Author: Stig Rohde Døssing <[email protected]> Authored: Wed Jul 19 15:25:18 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Thu Jul 20 23:04:42 2017 +0200 ---------------------------------------------------------------------- .../storm/kafka/spout/ManualPartitionSubscription.java | 8 ++++---- .../java/org/apache/storm/kafka/spout/ManualPartitioner.java | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c10f9296/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java index 7ddd8a7..61b98a8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java @@ -31,10 +31,10 @@ public class ManualPartitionSubscription extends Subscription { private static final long serialVersionUID = 5633018073527583826L; private final ManualPartitioner partitioner; private final TopicFilter partitionFilter; - private Set<TopicPartition> currentAssignment = null; - private KafkaConsumer<?, ?> consumer = null; - private ConsumerRebalanceListener listener = null; - private TopologyContext context = null; + private transient Set<TopicPartition> currentAssignment = null; + private transient KafkaConsumer<?, ?> consumer = null; + private transient ConsumerRebalanceListener listener = null; + private transient TopologyContext context = null; public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { this.partitionFilter = partitionFilter; http://git-wip-us.apache.org/repos/asf/storm/blob/c10f9296/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java index f4034ad..0abd6c8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java @@ -19,6 +19,7 @@ package org.apache.storm.kafka.spout; import java.util.List; +import java.io.Serializable; import org.apache.kafka.common.TopicPartition; import org.apache.storm.task.TopologyContext; @@ -28,7 +29,7 @@ import org.apache.storm.task.TopologyContext; * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total * number of spouts to avoid missing partitions or double assigning partitions. */ -public interface ManualPartitioner { +public interface ManualPartitioner extends Serializable { /** * Get the partitions for this assignment * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
