Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2538#discussion_r165852737
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V>
record) {
LOG.trace("Tuple for record [{}] has already been emitted.
Skipping", record);
} else {
final OffsetAndMetadata committedOffset =
kafkaConsumer.committed(tp);
- if (committedOffset != null &&
isOffsetCommittedByThisTopology(tp, committedOffset)
+ if (isAtLeastOnceProcessing()
+ && committedOffset != null
+ &&
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset,
offsetManagers)
&& committedOffset.offset() > record.offset()) {
// Ensures that after a topology with this id is started,
the consumer fetch
// position never falls behind the committed offset
(STORM-2844)
- throw new IllegalStateException("Attempting to emit a
message that has already been committed.");
+ throw new IllegalStateException("Attempting to emit a
message that has already been committed."
+ + " This should never occur in at-least-once mode.");
--- End diff --
for at-least-once semantics.
---