Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2438#discussion_r154863967
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -372,19 +370,23 @@ private void
ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earlie
}
// ======== emit =========
- private void emit() {
- while (!emitTupleIfNotEmitted(waitingToEmit.next()) &&
waitingToEmit.hasNext()) {
+ private void emitIfWaitingNotEmitted() {
+ while (waitingToEmit.hasNext()) {
+ final boolean emitted = emitOrRetryTuple(waitingToEmit.next());
waitingToEmit.remove();
+ if (emitted) {
+ break;
+ }
}
}
/**
- * Creates a tuple from the kafka record and emits it if it was not
yet emitted.
+ * Creates a tuple from the kafka record and emits it if it was never
emitted or is ready to be retried
--- End diff --
nit: If my memory is right, checksytle will check the ending '.' in
javadoc. It is missing.
---