This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 89d8b87 NIFI-7050 ConsumeJMS is not yielded in case of exception
89d8b87 is described below
commit 89d8b877f9adc3f474983c4dec45bf4586a6baef
Author: Gardella Juan Pablo <[email protected]>
AuthorDate: Tue Jan 21 17:07:53 2020 -0300
NIFI-7050 ConsumeJMS is not yielded in case of exception
This closes #4004.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../org/apache/nifi/jms/processors/ConsumeJMS.java | 1 +
.../apache/nifi/jms/processors/ConsumeJMSIT.java | 25 ++++++++++++++++++++++
2 files changed, 26 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 80f9457..357e2f9 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -280,6 +280,7 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
});
} catch(Exception e) {
consumer.setValid(false);
+ context.yield();
throw e; // for backward compatibility with exception handling in
flows
}
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index 085db0e..714b950 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
@@ -393,6 +394,30 @@ public class ConsumeJMSIT {
}
}
+ @Test(timeout = 10000)
+ public void whenExceptionIsRaisedTheProcessorShouldBeYielded() throws
Exception {
+ TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
+ JMSConnectionFactoryProviderDefinition cs =
mock(JMSConnectionFactoryProviderDefinition.class);
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("tcp://invalidhost:9999?soTimeout=3");
+
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(cf);
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(ConsumeJMS.DESTINATION, "foo");
+ runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+
+ try {
+ runner.run();
+ fail("The test was implemented in a way this line should not be
reached.");
+ } catch (AssertionError e) {
+ } finally {
+ assertTrue("In case of an exception, the processor should be
yielded.", ((MockProcessContext) runner.getProcessContext()).isYieldCalled());
+ }
+ }
+
private static void publishAMessage(ActiveMQConnectionFactory cf, final
String destinationName, String messageContent) throws JMSException {
// Publish a message.
try (Connection conn = cf.createConnection();