http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..44893cd --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,209 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses.
http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..6d315e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,18 @@ +nifi-jmsâprocessors-nar +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2016 The Apache Software Foundation + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml new file mode 100644 index 0000000..de35876 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml @@ -0,0 +1,68 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to You under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-bundle</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-jms-processors</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-cf-service</artifactId> + <version>0.6.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-jms</artifactId> + <version>4.2.4.RELEASE</version> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <version>1.2</version> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <exclusions> + <!-- --> + <exclusion> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java new file mode 100644 index 0000000..f8030db --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.ConnectionFactory; + +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; +import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; +import org.springframework.jms.core.JmsTemplate; + +/** + * Base JMS processor to support implementation of JMS producers and consumers. + * + * @param <T> + * the type of {@link JMSWorker} which could be {@link JMSPublisher} + * or {@link JMSConsumer} + * @see PublishJMS + * @see ConsumeJMS + * @see JMSConnectionFactoryProviderDefinition + */ +abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcessor { + + static final String QUEUE = "QUEUE"; + + static final String TOPIC = "TOPIC"; + + static final PropertyDescriptor USER = new PropertyDescriptor.Builder() + .name("User Name") + .description("User Name used for authentication and authorization.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("Password used for authentication and authorization.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("Destination Name") + .description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic').") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder() + .name("Destination Type") + .description("The type of the JMS Destination. Could be one of 'QUEUE' or 'TOPIC'. Usually provided by the administrator. Defaults to 'TOPIC") + .required(true) + .allowableValues(QUEUE, TOPIC) + .defaultValue(QUEUE) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder() + .name("Session Cache size") + .description("The maximum limit for the number of cached Sessions.") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + // ConnectionFactoryProvider ControllerService + static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder() + .name("Connection Factory Service") + .description("The Controller Service that is used to obtain ConnectionFactory") + .required(true) + .identifiesControllerService(JMSConnectionFactoryProviderDefinition.class) + .build(); + + static final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(); + + /* + * Will ensure that list of PropertyDescriptors is build only once, since + * all other lifecycle methods are invoked multiple times. + */ + static { + propertyDescriptors.add(USER); + propertyDescriptors.add(PASSWORD); + propertyDescriptors.add(DESTINATION); + propertyDescriptors.add(DESTINATION_TYPE); + propertyDescriptors.add(SESSION_CACHE_SIZE); + propertyDescriptors.add(CF_SERVICE); + } + + protected volatile T targetResource; + + private volatile CachingConnectionFactory cachingConnectionFactory; + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + /** + * Builds target resource ({@link JMSPublisher} or {@link JMSConsumer}) upon + * first invocation while delegating to the sub-classes ( {@link PublishJMS} + * or {@link ConsumeJMS}) via + * {@link #rendezvousWithJms(ProcessContext, ProcessSession)} method. + */ + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + synchronized (this) { + this.buildTargetResource(context); + } + this.rendezvousWithJms(context, session); + } + + /** + * Will destroy the instance of {@link CachingConnectionFactory} and sets + * 'targetResource' to null; + */ + @OnStopped + public void close() { + if (this.cachingConnectionFactory != null) { + this.cachingConnectionFactory.destroy(); + } + this.targetResource = null; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + " - " + this.targetResource; + } + + /** + * Delegate method to supplement + * {@link #onTrigger(ProcessContext, ProcessSession)} operation. It is + * implemented by sub-classes to perform {@link Processor} specific + * functionality. + * + * @param context + * instance of {@link ProcessContext} + * @param session + * instance of {@link ProcessSession} + */ + protected abstract void rendezvousWithJms(ProcessContext context, ProcessSession session) throws ProcessException; + + /** + * Finishes building one of the {@link JMSWorker} subclasses T. + * + * @param jmsTemplate instance of {@link JmsTemplate} + * + * @see JMSPublisher + * @see JMSConsumer + */ + protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate); + + /** + * This method essentially performs initialization of this Processor by + * obtaining an instance of the {@link ConnectionFactory} from the + * {@link JMSConnectionFactoryProvider} (ControllerService) and performing a + * series of {@link ConnectionFactory} adaptations which eventually results + * in an instance of the {@link CachingConnectionFactory} used to construct + * {@link JmsTemplate} used by this Processor. + */ + private void buildTargetResource(ProcessContext context) { + if (this.targetResource == null) { + JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class); + ConnectionFactory connectionFactory = cfProvider.getConnectionFactory(); + + UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new UserCredentialsConnectionFactoryAdapter(); + cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory); + cfCredentialsAdapter.setUsername(context.getProperty(USER).getValue()); + cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue()); + + this.cachingConnectionFactory = new CachingConnectionFactory(cfCredentialsAdapter); + this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue())); + + JmsTemplate jmsTemplate = new JmsTemplate(); + jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); + jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).getValue()); + jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); + + // set of properties that may be good candidates for exposure via configuration + jmsTemplate.setReceiveTimeout(10000); + + this.targetResource = this.finishBuildingTargetResource(jmsTemplate); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java new file mode 100644 index 0000000..a4cad0d --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; +import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.springframework.jms.core.JmsTemplate; + +/** + * Consuming JMS processor which upon each invocation of + * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a + * {@link FlowFile} containing the body of the consumed JMS message and JMS + * properties that came with message which are added to a {@link FlowFile} as + * attributes. + */ +@Tags({ "jms", "get", "message", "receive", "consume" }) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Consumes JMS Message of type BytesMessage or TextMessage transforming its content to " + + "a FlowFile and transitioning it to 'success' relationship.") +@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) +public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are received from the JMS Destination are routed to this relationship") + .build(); + + private final static Set<Relationship> relationships; + + static { + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); + } + + /** + * Will construct a {@link FlowFile} containing the body of the consumed JMS + * message (if {@link GetResponse} returned by {@link JMSConsumer} is not + * null) and JMS properties that came with message which are added to a + * {@link FlowFile} as attributes, transferring {@link FlowFile} to + * 'success' {@link Relationship}. + */ + @Override + protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException { + final JMSResponse response = this.targetResource.consume(); + if (response != null){ + FlowFile flowFile = processSession.create(); + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(response.getMessageBody()); + } + }); + Map<String, Object> jmsHeaders = response.getMessageHeaders(); + flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession); + processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).getValue()); + processSession.transfer(flowFile, REL_SUCCESS); + } else { + context.yield(); + } + } + + /** + * Will create an instance of {@link JMSConsumer} + */ + @Override + protected JMSConsumer finishBuildingTargetResource(JmsTemplate jmsTemplate) { + return new JMSConsumer(jmsTemplate, this.getLogger()); + } + + /** + * + */ + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + /** + * + */ + private FlowFile updateFlowFileAttributesWithJmsHeaders(Map<String, Object> jmsHeaders, FlowFile flowFile, ProcessSession processSession) { + Map<String, String> attributes = new HashMap<String, String>(); + for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) { + attributes.put(headersEntry.getKey(), String.valueOf(headersEntry.getValue())); + } + flowFile = processSession.putAllAttributes(flowFile, attributes); + return flowFile; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java new file mode 100644 index 0000000..e69705a --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.nifi.logging.ProcessorLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.JmsHeaders; + +/** + * Generic consumer of messages from JMS compliant messaging system. + */ +final class JMSConsumer extends JMSWorker { + + private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class); + + /** + * Creates an instance of this consumer + * + * @param jmsTemplate + * instance of {@link JmsTemplate} + * @param processLog + * instance of {@link ProcessorLog} + */ + JMSConsumer(JmsTemplate jmsTemplate, ProcessorLog processLog) { + super(jmsTemplate, processLog); + if (logger.isInfoEnabled()) { + logger.info("Created Message Consumer for '" + jmsTemplate.toString() + "'."); + } + } + + + /** + * + */ + public JMSResponse consume() { + Message message = this.jmsTemplate.receive(); + if (message != null) { + byte[] messageBody = null; + try { + if (message instanceof TextMessage) { + messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message); + } else if (message instanceof BytesMessage) { + messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); + } else { + throw new UnsupportedOperationException("Message type other then TextMessage and BytesMessage are " + + "not supported at the moment"); + } + Map<String, Object> messageHeaders = this.extractMessageHeaders(message); + Map<String, String> messageProperties = this.extractMessageProperties(message); + return new JMSResponse(messageBody, messageHeaders, messageProperties); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + return null; + } + + /** + * + */ + @SuppressWarnings("unchecked") + private Map<String, String> extractMessageProperties(Message message) { + Map<String, String> properties = new HashMap<>(); + try { + Enumeration<String> propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + String propertyName = propertyNames.nextElement(); + properties.put(propertyName, String.valueOf(message.getObjectProperty(propertyName))); + } + } catch (JMSException e) { + logger.warn("Failed to extract message properties", e); + this.processLog.warn("Failed to extract message properties", e); + } + return properties; + } + + /** + * + * + */ + private Map<String, Object> extractMessageHeaders(Message message) { + // even though all values are Strings in current impl, it may change in + // the future, so keeping it <String, Object> + Map<String, Object> messageHeaders = new HashMap<>(); + try { + messageHeaders.put(JmsHeaders.DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode())); + messageHeaders.put(JmsHeaders.EXPIRATION, String.valueOf(message.getJMSExpiration())); + messageHeaders.put(JmsHeaders.PRIORITY, String.valueOf(message.getJMSPriority())); + messageHeaders.put(JmsHeaders.REDELIVERED, String.valueOf(message.getJMSRedelivered())); + messageHeaders.put(JmsHeaders.TIMESTAMP, String.valueOf(message.getJMSTimestamp())); + messageHeaders.put(JmsHeaders.CORRELATION_ID, message.getJMSCorrelationID()); + messageHeaders.put(JmsHeaders.MESSAGE_ID, message.getJMSMessageID()); + messageHeaders.put(JmsHeaders.TYPE, message.getJMSType()); + + String replyToDestinationName = this.retrieveDestinationName(message.getJMSReplyTo(), JmsHeaders.REPLY_TO); + if (replyToDestinationName != null) { + messageHeaders.put(JmsHeaders.REPLY_TO, replyToDestinationName); + } + String destinationName = this.retrieveDestinationName(message.getJMSDestination(), JmsHeaders.DESTINATION); + if (destinationName != null) { + messageHeaders.put(JmsHeaders.DESTINATION, destinationName); + } + } catch (Exception e) { + throw new IllegalStateException("Failed to extract JMS Headers", e); + } + return messageHeaders; + } + + /** + * + */ + private String retrieveDestinationName(Destination destination, String headerName) { + String destinationName = null; + if (destination != null) { + try { + destinationName = (destination instanceof Queue) ? ((Queue) destination).getQueueName() + : ((Topic) destination).getTopicName(); + } catch (JMSException e) { + logger.warn("Failed to retrieve Destination name for '" + headerName + "' header", e); + this.processLog.warn("Failed to retrieve Destination name for '" + headerName + "' header", e); + } + } + return destinationName; + } + + /** + * + */ + static class JMSResponse { + private final byte[] messageBody; + + private final Map<String, Object> messageHeaders; + + private final Map<String, String> messageProperties; + + JMSResponse(byte[] messageBody, Map<String, Object> messageHeaders, Map<String, String> messageProperties) { + this.messageBody = messageBody; + this.messageHeaders = Collections.unmodifiableMap(messageHeaders); + this.messageProperties = Collections.unmodifiableMap(messageProperties); + } + + public byte[] getMessageBody() { + return this.messageBody; + } + + public Map<String, Object> getMessageHeaders() { + return this.messageHeaders; + } + + public Map<String, String> getMessageProperties() { + return messageProperties; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java new file mode 100644 index 0000000..3643c51 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import java.util.Map; +import java.util.Map.Entry; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.jms.core.SessionCallback; +import org.springframework.jms.support.JmsHeaders; + +/** + * Generic publisher of messages to JMS compliant messaging system. + */ +final class JMSPublisher extends JMSWorker { + + private final static Logger logger = LoggerFactory.getLogger(JMSPublisher.class); + + + /** + * Creates an instance of this publisher + * + * @param jmsTemplate + * instance of {@link JmsTemplate} + * @param processLog + * instance of {@link ProcessorLog} + */ + JMSPublisher(JmsTemplate jmsTemplate, ProcessorLog processLog) { + super(jmsTemplate, processLog); + if (logger.isInfoEnabled()) { + logger.info("Created Message Publisher for '" + jmsTemplate.toString() + "'."); + } + } + + /** + * + * @param messageBytes byte array representing contents of the message + */ + void publish(byte[] messageBytes) { + this.publish(messageBytes, null); + } + + /** + * + * @param messageBytes + * byte array representing contents of the message + * @param flowFileAttributes + * Map representing {@link FlowFile} attributes. + */ + void publish(final byte[] messageBytes, final Map<String, String> flowFileAttributes) { + this.jmsTemplate.send(new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(messageBytes); + if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) { + // set message headers and properties + for (Entry<String, String> entry : flowFileAttributes.entrySet()) { + if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-")) {// '-' is illegal char in JMS prop names + message.setStringProperty(entry.getKey(), entry.getValue()); + } else if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) { + message.setJMSDeliveryMode(Integer.parseInt(entry.getValue())); + } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) { + message.setJMSExpiration(Integer.parseInt(entry.getValue())); + } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) { + message.setJMSPriority(Integer.parseInt(entry.getValue())); + } else if (entry.getKey().equals(JmsHeaders.REDELIVERED)) { + message.setJMSRedelivered(Boolean.parseBoolean(entry.getValue())); + } else if (entry.getKey().equals(JmsHeaders.TIMESTAMP)) { + message.setJMSTimestamp(Long.parseLong(entry.getValue())); + } else if (entry.getKey().equals(JmsHeaders.CORRELATION_ID)) { + message.setJMSCorrelationID(entry.getValue()); + } else if (entry.getKey().equals(JmsHeaders.TYPE)) { + message.setJMSType(entry.getValue()); + } else if (entry.getKey().equals(JmsHeaders.REPLY_TO)) { + Destination destination = buildDestination(entry.getValue()); + if (destination != null) { + message.setJMSReplyTo(destination); + } else { + logUnbuildableDestination(entry.getKey(), JmsHeaders.REPLY_TO); + } + } else if (entry.getKey().equals(JmsHeaders.DESTINATION)) { + Destination destination = buildDestination(entry.getValue()); + if (destination != null) { + message.setJMSDestination(destination); + } else { + logUnbuildableDestination(entry.getKey(), JmsHeaders.DESTINATION); + } + } + } + } + return message; + } + }); + } + + /** + * + */ + private void logUnbuildableDestination(String destinationName, String headerName) { + logger.warn("Failed to determine destination type from destination name '" + destinationName + "'. The '" + + headerName + "' will not be set."); + processLog.warn("Failed to determine destination type from destination name '" + destinationName + "'. The '" + + headerName + "' will not be set."); + } + + /** + * + */ + private Destination buildDestination(final String destinationName) { + Destination destination; + if (destinationName.toLowerCase().contains("topic")) { + destination = this.jmsTemplate.execute(new SessionCallback<Topic>() { + @Override + public Topic doInJms(Session session) throws JMSException { + return session.createTopic(destinationName); + } + }); + } else if (destinationName.toLowerCase().contains("queue")) { + destination = this.jmsTemplate.execute(new SessionCallback<Queue>() { + @Override + public Queue doInJms(Session session) throws JMSException { + return session.createQueue(destinationName); + } + }); + } else { + destination = null; + } + return destination; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java new file mode 100644 index 0000000..bfb3f86 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import java.nio.channels.Channel; + +import javax.jms.Connection; + +import org.apache.nifi.logging.ProcessorLog; +import org.springframework.jms.core.JmsTemplate; + + +/** + * Base class for implementing publishing and consuming JMS workers. + * + * @see JMSPublisher + * @see JMSConsumer + */ +abstract class JMSWorker { + + protected final JmsTemplate jmsTemplate; + + protected final ProcessorLog processLog; + + /** + * Creates an instance of this worker initializing it with JMS + * {@link Connection} and creating a target {@link Channel} used by + * sub-classes to interact with JMS systems + * + * @param jmsTemplate the instance of {@link JmsTemplate} + * @param processLog the instance of {@link ProcessorLog} + */ + public JMSWorker(JmsTemplate jmsTemplate, ProcessorLog processLog) { + this.jmsTemplate = jmsTemplate; + this.processLog = processLog; + } + + /** + * + */ + @Override + public String toString() { + return this.getClass().getSimpleName() + "[destination:" + this.jmsTemplate.getDefaultDestinationName() + + "; pub-sub:" + this.jmsTemplate.isPubSubDomain() + ";]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java new file mode 100644 index 0000000..ed212db --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import java.io.IOException; +import java.io.InputStream; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import org.apache.commons.io.IOUtils; + +/** + * + */ +abstract class MessageBodyToBytesConverter { + + /** + * + * @param message instance of {@link TextMessage} + * @return byte array representing the {@link TextMessage} + */ + public static byte[] toBytes(TextMessage message) { + try { + return message.getText().getBytes(); + } catch (JMSException e) { + throw new MessageConversionException("Failed to convert BytesMessage to byte[]", e); + } + } + + /** + * + * @param message instance of {@link BytesMessage} + * @return byte array representing the {@link BytesMessage} + */ + public static byte[] toBytes(BytesMessage message){ + try { + InputStream is = new BytesMessageInputStream(message); + return IOUtils.toByteArray(is); + } catch (Exception e) { + throw new MessageConversionException("Failed to convert BytesMessage to byte[]", e); + } + } + + /** + * + */ + private static class BytesMessageInputStream extends InputStream { + private BytesMessage message; + + /** + * + */ + public BytesMessageInputStream(BytesMessage message) { + this.message = message; + } + + /** + * + */ + @Override + public int read() throws IOException { + try { + return this.message.readByte(); + } catch (JMSException e) { + throw new IOException(e.toString()); + } + } + + /** + * + */ + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + try { + if (offset == 0) + return this.message.readBytes(buffer, length); + else + return super.read(buffer, offset, length); + } catch (JMSException e) { + throw new IOException(e.toString()); + } + } + + /** + * + */ + @Override + public int read(byte[] buffer) throws IOException { + try { + return this.message.readBytes(buffer); + } catch (JMSException e) { + throw new IOException(e.toString()); + } + } + } + + /** + * + */ + static class MessageConversionException extends RuntimeException { + private static final long serialVersionUID = -1464448549601643887L; + + /** + * + */ + public MessageConversionException(String msg) { + super(msg); + } + + /** + * + */ + public MessageConversionException(String msg, Throwable cause) { + super(msg, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java new file mode 100644 index 0000000..8f6c18b --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import javax.jms.Destination; +import javax.jms.Message; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.JmsHeaders; + +/** + * An implementation of JMS Message publishing {@link Processor} which upon each + * invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will + * construct a {@link Message} from the contents of the {@link FlowFile} sending + * it to the {@link Destination} identified by the + * {@link AbstractJMSProcessor#DESTINATION} property while transferring the + * incoming {@link FlowFile} to 'success' {@link Relationship}. If message can + * not be constructed and/or sent the incoming {@link FlowFile} will be + * transitioned to 'failure' {@link Relationship} + */ +@Tags({ "jms", "put", "message", "send", "publish" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Creates a JMS Message from the contents of a FlowFile and sends it to a " + + "JMS Destination (queue or topic) as JMS BytesMessage.") +@SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class }) +public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are sent to the JMS destination are routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All FlowFiles that cannot be sent to JMS destination are routed to this relationship") + .build(); + + private final static Set<Relationship> relationships; + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + /** + * Will construct JMS {@link Message} by extracting its body from the + * incoming {@link FlowFile}. {@link FlowFile} attributes that represent + * standard JMS headers will be extracted from the {@link FlowFile} and set + * as JMS headers on the newly constructed message. For the list of + * available message headers please see {@link JmsHeaders}. <br> + * <br> + * Upon success the incoming {@link FlowFile} is transferred to the'success' + * {@link Relationship} and upon failure FlowFile is penalized and + * transferred to the 'failure' {@link Relationship} + * + */ + @Override + protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException { + FlowFile flowFile = processSession.get(); + if (flowFile != null) { + try { + this.targetResource.publish(this.extractMessageBody(flowFile, processSession), + flowFile.getAttributes()); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).getValue()); + } catch (Exception e) { + processSession.transfer(flowFile, REL_FAILURE); + this.getLogger().error("Failed while sending message to JMS via " + this.targetResource, e); + context.yield(); + } + } + } + + /** + * + */ + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + /** + * Will create an instance of {@link JMSPublisher} + */ + @Override + protected JMSPublisher finishBuildingTargetResource(JmsTemplate jmsTemplate) { + return new JMSPublisher(jmsTemplate, this.getLogger()); + } + + /** + * Extracts contents of the {@link FlowFile} as byte array. + */ + private byte[] extractMessageBody(FlowFile flowFile, ProcessSession session) { + final byte[] messageContent = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, messageContent, true); + } + }); + return messageContent; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..8e21d4f --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.jms.processors.PublishJMS +org.apache.nifi.jms.processors.ConsumeJMS \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html new file mode 100644 index 0000000..1f8fb7e --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html @@ -0,0 +1,61 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>ConsumeJMS</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Summary</h2> +<p> + This processor consumes messages from JMS compliant messaging system and converts them to a FlowFile to be routed to the next component in the flow. +</p> +<p> + This processor does two things. It constructs FlowFile by extracting information from the consumed JMS message including body, standard + <a href="http://docs.spring.io/spring-integration/api/org/springframework/integration/jms/JmsHeaders.html">JMS Headers</a> and Properties. + The message body is written to a FlowFile while standard JMS Headers and Properties are set as FlowFile attributes. +</p> + +<h2>Configuration Details</h2> +<p> + At the time of writing this document it only defines the essential configuration properties which are suitable for most cases. + Other properties will be defined later as this component progresses. + Configuring ConsumeJMS: +</p> +<ol> + <li><b>User Name</b> - [OPTIONAL] User Name used for authentication and authorization when this processor obtains <i>javax.jms.Connection</i> + from the pre-configured <i>javax.jms.ConnectionFactory</i> (see below). + </li> + <li><b>Password</b> - [OPTIONAL] Password used in conjunction with <b>User Name</b>. + </li> + <li><b>Destination Name</b> - [REQUIRED] the name of the <i>javax.jms.Destination</i>. + Usually provided by administrator (e.g., 'topic://myTopic'). + </li> + <li><b>Destination Type</b> - [OPTIONAL] the type of the <i>javax.jms.Destination</i>. Could be one of 'QUEUE' or 'TOPIC' + Usually provided by the administrator. Defaults to 'TOPIC'. + </li> + <li><b>Session Cache size</b> - [OPTIONAL] Specify the desired size for the JMS Session cache (per JMS Session type). This cache + size is the maximum limit for the number of cached Sessions. + Usually provided by the administrator (e.g., '2453'). Defaults to '1'. + </li> + <li><b>Connection Factory Service</b> - [REQUIRED] link to a pre-configured instance of org.apache.nifi.jms.cf.JMSConnectionFactoryProvider. + </li> +</ol> + +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html new file mode 100644 index 0000000..b812587 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html @@ -0,0 +1,64 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>PublishJMS</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>Summary</h2> +<p> + This processor publishes the contents of the incoming FlowFile to a JMS compliant messaging system. +</p> +<p> + This processor does two things. It constructs JMS Message by extracting FlowFile contents (both body and attributes). + Once message is constructed it is sent to a pre-configured JMS Destination. + Standard <a href="http://docs.spring.io/spring-integration/api/org/springframework/integration/jms/JmsHeaders.html">JMS Headers</a> + will be extracted from the FlowFile and set on <i>javax.jms.Message</i> as JMS headers while other + FlowFile attributes will be set as properties of <i>javax.jms.Message</i>. Upon success the incoming FlowFile is transfered + to the <i>success</i> Relationship and upon failure FlowFile is + penalized and transfered to the <i>failure</i> Relationship. +</p> +<h2>Configuration Details</h2> +<p> + At the time of writing this document it only defines the essential configuration properties which are suitable for most cases. + Other properties will be defined later as this component progresses. + Configuring PublishJMS: +</p> +<ol> + <li><b>User Name</b> - [OPTIONAL] User Name used for authentication and authorization when this processor obtains <i>javax.jms.Connection</i> + from the pre-configured <i>javax.jms.ConnectionFactory</i> (see below). + </li> + <li><b>Password</b> - [OPTIONAL] Password used in conjunction with <b>User Name</b>. + </li> + <li><b>Destination Name</b> - [REQUIRED] the name of the <i>javax.jms.Destination</i>. + Usually provided by administrator (e.g., 'topic://myTopic'). + </li> + <li><b>Destination Type</b> - [OPTIONAL] the type of the <i>javax.jms.Destination</i>. Could be one of 'QUEUE' or 'TOPIC' + Usually provided by the administrator. Defaults to 'TOPIC'. + </li> + <li><b>Session Cache size</b> - [OPTIONAL] Specify the desired size for the JMS Session cache (per JMS Session type). This cache + size is the maximum limit for the number of cached Sessions. + Usually provided by the administrator (e.g., '2453'). Defaults to '1'. + </li> + <li><b>Connection Factory Service</b> - [REQUIRED] link to a pre-configured instance of org.apache.nifi.jms.cf.JMSConnectionFactoryProvider. + </li> +</ol> + +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java new file mode 100644 index 0000000..6f0cf39 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import static org.junit.Assert.assertTrue; + +import java.util.Iterator; +import java.util.ServiceLoader; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.nifi.processor.Processor; +import org.junit.Test; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; + +public class CommonTest { + + @Test + public void validateServiceIsLocatableViaServiceLoader() { + ServiceLoader<Processor> loader = ServiceLoader.<Processor> load(Processor.class); + Iterator<Processor> iter = loader.iterator(); + boolean pubJmsPresent = false; + boolean consumeJmsPresent = false; + while (iter.hasNext()) { + Processor p = iter.next(); + if (p.getClass().getSimpleName().equals(PublishJMS.class.getSimpleName())) { + pubJmsPresent = true; + } else if (p.getClass().getSimpleName().equals(ConsumeJMS.class.getSimpleName())) { + consumeJmsPresent = true; + } + + } + assertTrue(pubJmsPresent); + assertTrue(consumeJmsPresent); + } + + static JmsTemplate buildJmsTemplateForDestination(String destinationName, boolean pubSub) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + CachingConnectionFactory cf = new CachingConnectionFactory(connectionFactory); + + JmsTemplate jmsTemplate = new JmsTemplate(cf); + jmsTemplate.setDefaultDestinationName(destinationName); + jmsTemplate.setPubSubDomain(pubSub); + return jmsTemplate; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java new file mode 100644 index 0000000..48195a3 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.JmsHeaders; + +public class ConsumeJMSTest { + + @Test + public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("cooQueue", false); + JMSPublisher sender = new JMSPublisher(jmsTemplate, mock(ProcessorLog.class)); + sender.publish("Hey dude!".getBytes()); + TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory()); + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(ConsumeJMS.DESTINATION, "cooQueue"); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); + runner.run(1, false); + // + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + assertEquals("cooQueue", successFF.getAttributes().get(JmsHeaders.DESTINATION)); + successFF.assertContentEquals("Hey dude!".getBytes()); + + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java new file mode 100644 index 0000000..e18263e --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse; +import org.apache.nifi.logging.ProcessorLog; +import org.junit.Test; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.jms.support.JmsHeaders; + +public class JMSPublisherConsumerTest { + + @Test + public void validateByesConvertedToBytesMessageOnSend() throws Exception { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false); + + JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ProcessorLog.class)); + publisher.publish("hellomq".getBytes()); + + Message receivedMessage = jmsTemplate.receive(); + assertTrue(receivedMessage instanceof BytesMessage); + byte[] bytes = new byte[7]; + ((BytesMessage) receivedMessage).readBytes(bytes); + assertEquals("hellomq", new String(bytes)); + + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + + @Test + public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws Exception { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false); + + JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ProcessorLog.class)); + Map<String, String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put("foo", "foo"); + flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic"); + publisher.publish("hellomq".getBytes(), flowFileAttributes); + + Message receivedMessage = jmsTemplate.receive(); + assertTrue(receivedMessage instanceof BytesMessage); + assertEquals("foo", receivedMessage.getStringProperty("foo")); + assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic); + assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName()); + + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + + /** + * At the moment the only two supported message types are TextMessage and + * BytesMessage which is sufficient for the type if JMS use cases NiFi is + * used. The may change to the point where all message types are supported + * at which point this test will no be longer required. + */ + @Test(expected = IllegalStateException.class) + public void validateFailOnUnsupportedMessageType() throws Exception { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false); + + jmsTemplate.send(new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + return session.createObjectMessage(); + } + }); + + JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ProcessorLog.class)); + try { + consumer.consume(); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + } + + @Test + public void validateConsumeWithCustomHeadersAndProperties() throws Exception { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false); + + jmsTemplate.send(new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + TextMessage message = session.createTextMessage("hello from the other side"); + message.setStringProperty("foo", "foo"); + message.setBooleanProperty("bar", false); + message.setJMSReplyTo(session.createQueue("fooQueue")); + return message; + } + }); + + JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ProcessorLog.class)); + assertEquals("JMSConsumer[destination:testQueue; pub-sub:false;]", consumer.toString()); + + JMSResponse response = consumer.consume(); + assertEquals("hello from the other side", new String(response.getMessageBody())); + assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO)); + assertEquals("foo", response.getMessageProperties().get("foo")); + assertEquals("false", response.getMessageProperties().get("bar")); + + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } +}
