Github user rick-kilgore commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/254#discussion_r17396897
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---
    @@ -191,6 +208,7 @@ public void ack(Long offset) {
                 _pending.headSet(offset - 
_spoutConfig.maxOffsetBehind).clear();
             }
             _pending.remove(offset);
    +        retryRecords.remove(offset);
    --- End diff --
    
    I did notice this, but chose to leave it.  My understanding is that the 
topology is expected to eventually ack() all messages.  And if I understand 
right, if a worker JVM or host crashes, then the message will eventually 
timeout due to the topology.message.timeout.secs setting in 
backtype.storm.Config, and it will then be re-submitted to the topology and the 
new worker should ack() it at that point - since the new worker will still use 
the same kafka offset  for the key in my retryRecords map that the crashed 
worker was using.
    
    I'm not 100% sure that covers everything, but as I have it, it's also 
working the same way cleanup of the _pending collection has been working and 
still is.  Also, I wonder if it is a good thing that there will be a visible 
problem if some topology is failing to ack its messages - in the form of 
running out of memory in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to