Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r183531789
  
    --- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -339,26 +339,26 @@ public void nextTuple() {
          */
         @Override
         public void ack(Object msgId) {
    -
             Message msg = this.pendingMessages.remove(msgId);
    -        JmsMessageID oldest = this.toCommit.first();
    -        if (msgId.equals(oldest)) {
    -            if (msg != null) {
    -                try {
    -                    LOG.debug("Committing...");
    -                    msg.acknowledge();
    -                    LOG.debug("JMS Message acked: " + msgId);
    -                    this.toCommit.remove(msgId);
    -                } catch (JMSException e) {
    -                    LOG.warn("Error acknowldging JMS message: " + msgId, 
e);
    +        if (!toCommit.isEmpty()) {
    +            JmsMessageID oldest = this.toCommit.first();
    +            if (msgId.equals(oldest)) {
    +                if (msg != null) {
    +                    try {
    +                        LOG.debug("Committing...");
    +                        msg.acknowledge();
    --- End diff --
    
    This piece of code was already there. I am guessing its based on the JMS 
acknowledgement mode.
    
    See - https://docs.oracle.com/cd/E19798-01/821-1841/bncfw/index.html
    
    In `Session.CLIENT_ACKNOWLEDGE` - Acknowledging a consumed message 
automatically acknowledges the receipt of all messages that have been consumed 
by its session, so this logic seems fine. 
    
    The spout is ignoring Auto acknowledgement mode, but I am not sure about 
the other modes like `DUPS_OK_ACKNOWLEDGE` or `SESSION_TRANSACTED` work. cc 
@ptgoetz who might have more context around this.


---

Reply via email to