This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3de3ad4 NIFI-7245: JMS processors yield when connection factory
initialisation failed
3de3ad4 is described below
commit 3de3ad40290ccd4c9e09d5c4fd03e83a6cbf0d86
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Mar 11 15:46:05 2020 +0100
NIFI-7245: JMS processors yield when connection factory initialisation
failed
Signed-off-by: Pierre Villard <[email protected]>
This closes #4133.
---
.../nifi/jms/processors/AbstractJMSProcessor.java | 8 ++++++-
.../apache/nifi/jms/processors/ConsumeJMSIT.java | 26 ++++++++++++++++++++++
.../apache/nifi/jms/processors/PublishJMSIT.java | 25 +++++++++++++++++++++
3 files changed, 58 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 39b8dac..78b2d90 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -179,7 +179,13 @@ public abstract class AbstractJMSProcessor<T extends
JMSWorker> extends Abstract
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
T worker = workerPool.poll();
if (worker == null) {
- worker = buildTargetResource(context);
+ try {
+ worker = buildTargetResource(context);
+ } catch (Exception e) {
+ getLogger().error("Failed to initialize JMS Connection
Factory", e);
+ context.yield();
+ return;
+ }
}
try {
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index 714b950..64728fe 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -31,6 +31,8 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
@@ -418,6 +420,30 @@ public class ConsumeJMSIT {
}
}
+ @Test
+ public void
whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded()
throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(ConsumeJMS.class);
+
+ // using (non-JNDI) JMS Connection Factory via controller service
+ JMSConnectionFactoryProvider cfProvider = new
JMSConnectionFactoryProvider();
+ runner.addControllerService("cfProvider", cfProvider);
+ runner.setProperty(cfProvider,
JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL,
"DummyJMSConnectionFactoryClass");
+ runner.setProperty(cfProvider,
JMSConnectionFactoryProperties.JMS_BROKER_URI, "DummyBrokerUri");
+ runner.enableControllerService(cfProvider);
+
+ runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(ConsumeJMS.DESTINATION, "myTopic");
+ runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+
+ try {
+ runner.run();
+ fail("The test was implemented in a way this line should not be
reached.");
+ } catch (AssertionError e) {
+ } finally {
+ assertTrue("In case of an exception, the processor should be
yielded.", ((MockProcessContext) runner.getProcessContext()).isYieldCalled());
+ }
+ }
+
private static void publishAMessage(ActiveMQConnectionFactory cf, final
String destinationName, String messageContent) throws JMSException {
// Publish a message.
try (Connection conn = cf.createConnection();
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index 247a3ac..b901b73 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -31,10 +32,12 @@ import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
@@ -381,4 +384,26 @@ public class PublishJMSIT {
}
}
}
+
+ @Test
+ public void
whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded()
throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(PublishJMS.class);
+
+ // using JNDI JMS Connection Factory configured locally on the
processor
+
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY,
"DummyInitialContextFactoryClass");
+
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL,
"DummyProviderUrl");
+
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME,
"DummyConnectionFactoryName");
+
+ runner.setProperty(ConsumeJMS.DESTINATION, "myTopic");
+ runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+
+ try {
+ runner.enqueue("message");
+ runner.run();
+ fail("The test was implemented in a way this line should not be
reached.");
+ } catch (AssertionError e) {
+ } finally {
+ assertTrue("In case of an exception, the processor should be
yielded.", ((MockProcessContext) runner.getProcessContext()).isYieldCalled());
+ }
+ }
}