nandorsoma commented on code in PR #6930:
URL: https://github.com/apache/nifi/pull/6930#discussion_r1106833048


##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java:
##########
@@ -409,29 +418,75 @@ public void 
whenExceptionIsRaisedTheProcessorShouldBeYielded() throws Exception
         runner.setProperty(ConsumeJMS.DESTINATION, "foo");
         runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
 
-         assertThrows(AssertionError.class, () -> runner.run());
-         assertTrue(((MockProcessContext) 
runner.getProcessContext()).isYieldCalled(), "In case of an exception, the 
processor should be yielded.");
+        assertCausedBy(UnknownHostException.class, runner::run);
+
+        assertTrue(((MockProcessContext) 
runner.getProcessContext()).isYieldCalled(), "In case of an exception, the 
processor should be yielded.");
     }
 
     @Test
     public void 
whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded()
 throws Exception {
+        final String nonExistentClassName = "DummyJMSConnectionFactoryClass";
+
         TestRunner runner = TestRunners.newTestRunner(ConsumeJMS.class);
 
         // using (non-JNDI) JMS Connection Factory via controller service
         JMSConnectionFactoryProvider cfProvider = new 
JMSConnectionFactoryProvider();
         runner.addControllerService("cfProvider", cfProvider);
-        runner.setProperty(cfProvider, 
JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, 
"DummyJMSConnectionFactoryClass");
+        runner.setProperty(cfProvider, 
JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, 
nonExistentClassName);
         runner.setProperty(cfProvider, 
JMSConnectionFactoryProperties.JMS_BROKER_URI, "DummyBrokerUri");
         runner.enableControllerService(cfProvider);
 
         runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
         runner.setProperty(ConsumeJMS.DESTINATION, "myTopic");
         runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
 
-        assertThrows(AssertionError.class, () -> runner.run());
+        assertCausedBy(ClassNotFoundException.class, nonExistentClassName, 
runner::run);
+
         assertTrue(((MockProcessContext) 
runner.getProcessContext()).isYieldCalled(), "In case of an exception, the 
processor should be yielded.");
     }
 
+    @Test
+    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+    public void 
whenExceptionIsRaisedInAcceptTheProcessorShouldYieldAndRollback() throws 
Exception {
+        final String destination = "testQueue";
+        final RuntimeException expectedException = new RuntimeException();
+
+        final ConsumeJMS processor = new ConsumeJMS() {
+            @Override
+            protected void rendezvousWithJms(ProcessContext context, 
ProcessSession processSession, JMSConsumer consumer) throws ProcessException {
+                ProcessSession spiedSession = spy(processSession);
+                
doThrow(expectedException).when(spiedSession).write(any(FlowFile.class), 
any(OutputStreamCallback.class));
+                super.rendezvousWithJms(context, spiedSession, consumer);
+            }
+        };
+
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        try {
+            JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
+
+            sender.jmsTemplate.send(destination, session -> 
session.createTextMessage("msg"));
+
+            TestRunner runner = TestRunners.newTestRunner(processor);
+            JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+            when(cs.getIdentifier()).thenReturn("cfProvider");
+            
when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
+            runner.addControllerService("cfProvider", cs);
+            runner.enableControllerService(cs);
+
+            runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+            runner.setProperty(ConsumeJMS.DESTINATION, destination);
+            runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
+
+            ((MockSessionFactory) 
runner.getProcessSessionFactory()).getCreatedSessions();

Review Comment:
   No, it is a leftover from debug. 😕



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to