Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2537#discussion_r165850081
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -519,6 +519,15 @@ private boolean isEmitTuple(List<Object> tuple) {
return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
}
+ private void commitConsumedOffsets(Set<TopicPartition>
assignedPartitions) {
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new
HashMap<>();
+ for (TopicPartition tp : assignedPartitions) {
+ offsetsToCommit.put(tp, new
OffsetAndMetadata(kafkaConsumer.position(tp)));
+ }
+ kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --
I don't think it improves readability, it also looks a little weird to have
a constant null field at the top of the class IMO.
---