tpalfy commented on a change in pull request #3987: NIFI-7015 - ConsumeJMS
supports additional message types
URL: https://github.com/apache/nifi/pull/3987#discussion_r366485252
##########
File path:
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
##########
@@ -65,11 +79,183 @@ public void
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
successFF.assertAttributeEquals("filename", "message.txt");
successFF.assertAttributeExists("attribute_from_sender");
successFF.assertAttributeEquals("attribute_from_sender", "some
value");
+ successFF.assertAttributeExists("jms.messagetype");
+ successFF.assertAttributeEquals("jms.messagetype", "BytesMessage");
successFF.assertContentEquals("Hey dude!".getBytes());
String sourceDestination =
successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
assertNotNull(sourceDestination);
} finally {
((CachingConnectionFactory)
jmsTemplate.getConnectionFactory()).destroy();
}
}
+
+ @Test
+ public void
testValidateErrorQueueWhenDestinationIsTopicAndErrorQueueIsSet() throws
Exception {
+ testValidateErrorQueue(ConsumeJMS.TOPIC, "errorQueue", false);
+ }
+
+ @Test
+ public void
testValidateErrorQueueWhenDestinationIsTopicAndErrorQueueIsNotSet() throws
Exception {
+ testValidateErrorQueue(ConsumeJMS.TOPIC, null, true);
+ }
+
+ @Test
+ public void
testValidateErrorQueueWhenDestinationIsQueueAndErrorQueueIsSet() throws
Exception {
+ testValidateErrorQueue(ConsumeJMS.QUEUE, "errorQueue", true);
+ }
+
+ @Test
+ public void
testValidateErrorQueueWhenDestinationIsQueueAndErrorQueueIsNotSet() throws
Exception {
+ testValidateErrorQueue(ConsumeJMS.QUEUE, null, true);
+ }
+
+ private void testValidateErrorQueue(String destinationType, String
errorQueue, boolean expectedValid) throws Exception {
+ JmsTemplate jmsTemplate =
CommonTest.buildJmsTemplateForDestination(false);
+
+ try {
+ TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
+
+ JMSConnectionFactoryProviderDefinition cfService =
mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cfService.getIdentifier()).thenReturn("cfService");
+
when(cfService.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
+
+ runner.addControllerService("cfService", cfService);
+ runner.enableControllerService(cfService);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfService");
+ runner.setProperty(ConsumeJMS.DESTINATION, "destination");
+ runner.setProperty(ConsumeJMS.DESTINATION_TYPE, destinationType);
+ if (errorQueue != null) {
+ runner.setProperty(ConsumeJMS.ERROR_QUEUE, errorQueue);
+ }
+
+ if (expectedValid) {
+ runner.assertValid();
+ } else {
+ runner.assertNotValid();
+ }
+ } finally {
+ ((CachingConnectionFactory)
jmsTemplate.getConnectionFactory()).destroy();
+ }
+ }
+
+ @Test
+ public void testTextMessageTypeAttribute() throws Exception {
+ testMessageTypeAttribute(
+ "testTextMessage",
+ Session::createTextMessage,
+ TextMessage.class.getSimpleName()
+ );
+ }
+
+ @Test
+ public void testByteMessageTypeAttribute() throws Exception {
+ testMessageTypeAttribute(
+ "testByteMessage",
+ Session::createBytesMessage,
+ BytesMessage.class.getSimpleName()
+ );
+ }
+
+ @Test
+ public void testObjectMessageTypeAttribute() throws Exception {
+ String destinationName = "testObjectMessage";
Review comment:
Minor: Could be inlined similar to the other test methods.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services