[ 
https://issues.apache.org/jira/browse/NIFI-2774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15493907#comment-15493907
 ] 

ASF GitHub Bot commented on NIFI-2774:
--------------------------------------

Github user ckmcd commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1022#discussion_r79008301
  
    --- Diff: 
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 ---
    @@ -78,22 +77,24 @@
          */
         @Override
         protected void rendezvousWithJms(ProcessContext context, 
ProcessSession processSession) throws ProcessException {
    -        final JMSResponse response = this.targetResource.consume();
    -        if (response != null){
    -            FlowFile flowFile = processSession.create();
    -            flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws 
IOException {
    -                    out.write(response.getMessageBody());
    -                }
    -            });
    -            Map<String, Object> jmsHeaders = response.getMessageHeaders();
    -            flowFile = 
this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, 
processSession);
    -            processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
    -            processSession.transfer(flowFile, REL_SUCCESS);
    -        } else {
    -            context.yield();
    -        }
    +        this.targetResource.consume(response -> {
    +            if (response != null) {
    +                FlowFile flowFile = processSession.create();
    +                flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws 
IOException {
    +                        out.write(response.getMessageBody());
    +                    }
    +                });
    +                Map<String, Object> jmsHeaders = 
response.getMessageHeaders();
    +                flowFile = 
this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, 
processSession);
    +                processSession.getProvenanceReporter().receive(flowFile,
    +                        
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
    +                processSession.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    Question: doesn't the session need to be explicitly committed here, before 
the message gets acknowledged, in order to complete the chain of custody?  
Otherwise I believe there is a small window where if NiFi crashed before 
session is committed by scheduler, the acknowledged message can be lost.


> ConsumeJMS processor losses messages on NiFi restart
> ----------------------------------------------------
>
>                 Key: NIFI-2774
>                 URL: https://issues.apache.org/jira/browse/NIFI-2774
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 1.0.0, 0.7.0
>            Reporter: Christopher McDermott
>            Assignee: Oleg Zhurakousky
>            Priority: Critical
>             Fix For: 1.1.0, 0.8.0
>
>         Attachments: 2774.patch
>
>
> ConsumeJMS processor uses auto-acknowledge mode.  Unlike the deprecated 
> GetJMSQueue processor it does not provide a way to specify a different ACK 
> mode (i.e. client-acknowledge.)  Using auto-acknowledge, acknowledges message 
> receipt from JMS *before* the messages are actually added to the flow.  This 
> leads to data-loss on NiFi stop (or crash.)
> I believe the fix for this is to allow the user to specify the ACK mode in 
> the processor configuration like is allowed by the GetJMSQueue processor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to