[
https://issues.apache.org/jira/browse/NIFI-2774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15493907#comment-15493907
]
ASF GitHub Bot commented on NIFI-2774:
--------------------------------------
Github user ckmcd commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1022#discussion_r79008301
--- Diff:
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
---
@@ -78,22 +77,24 @@
*/
@Override
protected void rendezvousWithJms(ProcessContext context,
ProcessSession processSession) throws ProcessException {
- final JMSResponse response = this.targetResource.consume();
- if (response != null){
- FlowFile flowFile = processSession.create();
- flowFile = processSession.write(flowFile, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- out.write(response.getMessageBody());
- }
- });
- Map<String, Object> jmsHeaders = response.getMessageHeaders();
- flowFile =
this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile,
processSession);
- processSession.getProvenanceReporter().receive(flowFile,
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
- processSession.transfer(flowFile, REL_SUCCESS);
- } else {
- context.yield();
- }
+ this.targetResource.consume(response -> {
+ if (response != null) {
+ FlowFile flowFile = processSession.create();
+ flowFile = processSession.write(flowFile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ out.write(response.getMessageBody());
+ }
+ });
+ Map<String, Object> jmsHeaders =
response.getMessageHeaders();
+ flowFile =
this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile,
processSession);
+ processSession.getProvenanceReporter().receive(flowFile,
+
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
+ processSession.transfer(flowFile, REL_SUCCESS);
--- End diff --
Question: doesn't the session need to be explicitly committed here, before
the message gets acknowledged, in order to complete the chain of custody?
Otherwise I believe there is a small window where if NiFi crashed before
session is committed by scheduler, the acknowledged message can be lost.
> ConsumeJMS processor losses messages on NiFi restart
> ----------------------------------------------------
>
> Key: NIFI-2774
> URL: https://issues.apache.org/jira/browse/NIFI-2774
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 1.0.0, 0.7.0
> Reporter: Christopher McDermott
> Assignee: Oleg Zhurakousky
> Priority: Critical
> Fix For: 1.1.0, 0.8.0
>
> Attachments: 2774.patch
>
>
> ConsumeJMS processor uses auto-acknowledge mode. Unlike the deprecated
> GetJMSQueue processor it does not provide a way to specify a different ACK
> mode (i.e. client-acknowledge.) Using auto-acknowledge, acknowledges message
> receipt from JMS *before* the messages are actually added to the flow. This
> leads to data-loss on NiFi stop (or crash.)
> I believe the fix for this is to allow the user to specify the ACK mode in
> the processor configuration like is allowed by the GetJMSQueue processor.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)