Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2538#discussion_r165852272
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -142,7 +139,7 @@ public void open(Map<String, Object> conf,
TopologyContext context, SpoutOutputC
offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = new HashMap<>();
- setCommitMetadata(context);
+ commitMetadataManager = new CommitMetadataManager(context,
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --
I wonder if this should become available to the KakfaSpout through
KafkaSpoutConfig, perhaps using a factory such that we could make it pluggable,
in case there is need to support a different behavior in the future.
We can also wait to do that until we need it. Just wanted to get your
thoughts on it.
---