[ https://issues.apache.org/jira/browse/NIFI-2774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15493907#comment-15493907 ]
ASF GitHub Bot commented on NIFI-2774: -------------------------------------- Github user ckmcd commented on a diff in the pull request: https://github.com/apache/nifi/pull/1022#discussion_r79008301 --- Diff: nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java --- @@ -78,22 +77,24 @@ */ @Override protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException { - final JMSResponse response = this.targetResource.consume(); - if (response != null){ - FlowFile flowFile = processSession.create(); - flowFile = processSession.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(response.getMessageBody()); - } - }); - Map<String, Object> jmsHeaders = response.getMessageHeaders(); - flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession); - processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); - processSession.transfer(flowFile, REL_SUCCESS); - } else { - context.yield(); - } + this.targetResource.consume(response -> { + if (response != null) { + FlowFile flowFile = processSession.create(); + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(response.getMessageBody()); + } + }); + Map<String, Object> jmsHeaders = response.getMessageHeaders(); + flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession); + processSession.getProvenanceReporter().receive(flowFile, + context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); + processSession.transfer(flowFile, REL_SUCCESS); --- End diff -- Question: doesn't the session need to be explicitly committed here, before the message gets acknowledged, in order to complete the chain of custody? Otherwise I believe there is a small window where if NiFi crashed before session is committed by scheduler, the acknowledged message can be lost. > ConsumeJMS processor losses messages on NiFi restart > ---------------------------------------------------- > > Key: NIFI-2774 > URL: https://issues.apache.org/jira/browse/NIFI-2774 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework > Affects Versions: 1.0.0, 0.7.0 > Reporter: Christopher McDermott > Assignee: Oleg Zhurakousky > Priority: Critical > Fix For: 1.1.0, 0.8.0 > > Attachments: 2774.patch > > > ConsumeJMS processor uses auto-acknowledge mode. Unlike the deprecated > GetJMSQueue processor it does not provide a way to specify a different ACK > mode (i.e. client-acknowledge.) Using auto-acknowledge, acknowledges message > receipt from JMS *before* the messages are actually added to the flow. This > leads to data-loss on NiFi stop (or crash.) > I believe the fix for this is to allow the user to specify the ACK mode in > the processor configuration like is allowed by the GetJMSQueue processor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)