Github user rick-kilgore commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/254#discussion_r17396897
--- Diff: external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---
@@ -191,6 +208,7 @@ public void ack(Long offset) {
_pending.headSet(offset -
_spoutConfig.maxOffsetBehind).clear();
}
_pending.remove(offset);
+ retryRecords.remove(offset);
--- End diff --
I did notice this, but chose to leave it. My understanding is that the
topology is expected to eventually ack() all messages. And if I understand
right, if a worker JVM or host crashes, then the message will eventually
timeout due to the topology.message.timeout.secs setting in
backtype.storm.Config, and it will then be re-submitted to the topology and the
new worker should ack() it at that point - since the new worker will still use
the same kafka offset for the key in my retryRecords map that the crashed
worker was using.
I'm not 100% sure that covers everything, but as I have it, it's also
working the same way cleanup of the _pending collection has been working and
still is. Also, I wonder if it is a good thing that there will be a visible
problem if some topology is failing to ack its messages - in the form of
running out of memory in this case.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---