Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2538#discussion_r165852272 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -142,7 +139,7 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC offsetManagers = new HashMap<>(); emitted = new HashSet<>(); waitingToEmit = new HashMap<>(); - setCommitMetadata(context); + commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee()); --- End diff -- I wonder if this should become available to the KakfaSpout through KafkaSpoutConfig, perhaps using a factory such that we could make it pluggable, in case there is need to support a different behavior in the future. We can also wait to do that until we need it. Just wanted to get your thoughts on it.
---