Gardella Juan Pablo created NIFI-7050:
-----------------------------------------
Summary: ConsumeJMS is not yielded in case of exception
Key: NIFI-7050
URL: https://issues.apache.org/jira/browse/NIFI-7050
Project: Apache NiFi
Issue Type: Improvement
Components: Extensions
Affects Versions: 1.10.0
Reporter: Gardella Juan Pablo
If any exception happens when ConsumerJMS tries to read messages, the process
tries again immediately.
{code:java}
try {
consumer.consume(destinationName, errorQueueName, durable, shared,
subscriptionName, charset, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {
return;
}
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, out ->
out.write(response.getMessageBody()));
final Map<String, String> jmsHeaders =
response.getMessageHeaders();
final Map<String, String> jmsProperties =
response.getMessageProperties();
flowFile =
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile,
processSession);
flowFile =
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties,
flowFile, processSession);
flowFile = processSession.putAttribute(flowFile,
JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile,
destinationName);
processSession.putAttribute(flowFile, JMS_MESSAGETYPE,
response.getMessageType());
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
}
});
} catch(Exception e) {
consumer.setValid(false);
throw e; // for backward compatibility with exception handling in
flows
}
}
{code}
It should call {{context.yield}} in exception block. Notice
[PublishJMS|https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java#L166]
is yielded in the same scenario. It is requires to do in the ConsumeJMS
processor only.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)