Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2639#discussion_r183531789 --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java --- @@ -339,26 +339,26 @@ public void nextTuple() { */ @Override public void ack(Object msgId) { - Message msg = this.pendingMessages.remove(msgId); - JmsMessageID oldest = this.toCommit.first(); - if (msgId.equals(oldest)) { - if (msg != null) { - try { - LOG.debug("Committing..."); - msg.acknowledge(); - LOG.debug("JMS Message acked: " + msgId); - this.toCommit.remove(msgId); - } catch (JMSException e) { - LOG.warn("Error acknowldging JMS message: " + msgId, e); + if (!toCommit.isEmpty()) { + JmsMessageID oldest = this.toCommit.first(); + if (msgId.equals(oldest)) { + if (msg != null) { + try { + LOG.debug("Committing..."); + msg.acknowledge(); --- End diff -- This piece of code was already there. I am guessing its based on the JMS acknowledgement mode. See - https://docs.oracle.com/cd/E19798-01/821-1841/bncfw/index.html In `Session.CLIENT_ACKNOWLEDGE` - Acknowledging a consumed message automatically acknowledges the receipt of all messages that have been consumed by its session, so this logic seems fine. The spout is ignoring Auto acknowledgement mode, but I am not sure about the other modes like `DUPS_OK_ACKNOWLEDGE` or `SESSION_TRANSACTED` work. cc @ptgoetz who might have more context around this.
---