Repository: storm Updated Branches: refs/heads/master ce19f81d5 -> 38fb8cce0
STORM-2642: Storm-kafka-client spout cannot be serialized when using manual partition assignment Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de0d270b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de0d270b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de0d270b Branch: refs/heads/master Commit: de0d270b977d6d5560ee1e9e2a602c9cf93762e4 Parents: ce19f81 Author: Stig Rohde Døssing <[email protected]> Authored: Wed Jul 19 15:25:18 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Thu Jul 20 19:27:45 2017 +0200 ---------------------------------------------------------------------- .../spout/subscription/ManualPartitionSubscription.java | 8 ++++---- .../storm/kafka/spout/subscription/ManualPartitioner.java | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/de0d270b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java index 17512ea..32376d4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java @@ -32,10 +32,10 @@ public class ManualPartitionSubscription extends Subscription { private static final long serialVersionUID = 5633018073527583826L; private final ManualPartitioner partitioner; private final TopicFilter partitionFilter; - private Set<TopicPartition> currentAssignment = null; - private KafkaConsumer<?, ?> consumer = null; - private ConsumerRebalanceListener listener = null; - private TopologyContext context = null; + private transient Set<TopicPartition> currentAssignment = null; + private transient KafkaConsumer<?, ?> consumer = null; + private transient ConsumerRebalanceListener listener = null; + private transient TopologyContext context = null; public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { this.partitionFilter = partitionFilter; http://git-wip-us.apache.org/repos/asf/storm/blob/de0d270b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java index dce7fc6..b456f8d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java @@ -18,6 +18,7 @@ package org.apache.storm.kafka.spout.subscription; +import java.io.Serializable; import java.util.List; import org.apache.kafka.common.TopicPartition; import org.apache.storm.task.TopologyContext; @@ -29,7 +30,7 @@ import org.apache.storm.task.TopologyContext; * number of spouts to avoid missing partitions or double assigning partitions. */ @FunctionalInterface -public interface ManualPartitioner { +public interface ManualPartitioner extends Serializable { /** * Get the partitions for this assignment * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
