Github user hmcl commented on a diff in the pull request:
@@ -142,7 +139,7 @@ public void open(Map<String, Object> conf,
TopologyContext context, SpoutOutputC
offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = new HashMap<>();
+ commitMetadataManager = new CommitMetadataManager(context,
--- End diff --
I wonder if this should become available to the KakfaSpout through
KafkaSpoutConfig, perhaps using a factory such that we could make it pluggable,
in case there is need to support a different behavior in the future.
We can also wait to do that until we need it. Just wanted to get your
thoughts on it.