Github user reiabreu commented on a diff in the pull request:
https://github.com/apache/storm/pull/2593#discussion_r178559146
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -570,20 +572,25 @@ public void ack(Object messageId) {
// Only need to keep track of acked tuples if commits to Kafka are
controlled by
// tuple acks, which happens only for at-least-once processing
semantics
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
- if (!emitted.contains(msgId)) {
- if (msgId.isEmitted()) {
+ if (!msgId.isNullTuple()) {
--- End diff --
Sounds good, will update it.
Regarding tests, we need to add at least one:
Test all messages are commited for all null tuples when Spout is not set to
emit null tuples
```
@Test
public void testShouldCommitAllMessagesIfNotSetToEmitNullTuples()
throws Exception {
final int messageCount = 10;
prepareSpout(messageCount);
//All null tuples should be commited, meaning they were considered
by to be emitted and acked
for(int i = 0; i < messageCount; i++) {
spout.nextTuple();
}
verify(collectorMock,never()).emit(
anyString(),
anyList(),
any());
Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
//Commit offsets
spout.nextTuple();
verifyAllMessagesCommitted(messageCount);
}
```
We would need a new SingleTopicKafkaSpoutConfiguration with something like:
```
private static class NullRecordExtractor implements RecordTranslator{
@Override
public List<Object> apply(ConsumerRecord record) {
return null;
}
@Override
public Fields getFieldsFor(String stream) {
return new Fields("topic", "key", "value");
}
@Override
public Object apply(Object record) {
return null;
}
}
```
I was planning to extend KafkaSpoutAbstractTest on something similar to
KafkaSpoutSingleTopicTest.
---