NIFI-614 Added initial support for new style JMS NIFI-614 finalized JMSConnectionFactoryProvider ControllerService
NIFI-614 finalized implementation of both Processors and ControllerService NIFI-614 added initial documentation NIFI-614 addressed PR comment with unused import and squashed NIFI-614 added @OnDisabled method NIFI-614 changed POMs to 0.6 NIFI-614 removed local .gitignore NIFI-614 added support for parsing Tibco URL NIFI-614 removed setting of jms message id NIFI-614 addressed PR comments, fixed tests NIFI-614 addressed latest PR comments NIFI-614 second round of PR comments addressed NIFI-614 3rd round of PR comments addressed NIFI-614 finalizing on PR comments NIFI-614 more PR comments This closes #222 Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/812da19c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/812da19c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/812da19c Branch: refs/heads/master Commit: 812da19cad5f33437c3ae7a35fa758af3de54a11 Parents: cfeebfe Author: Oleg Zhurakousky <[email protected]> Authored: Tue Jan 26 09:21:46 2016 -0500 Committer: jpercivall <[email protected]> Committed: Tue Mar 15 15:48:03 2016 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 10 + .../nifi-jms-cf-service-nar/pom.xml | 37 +++ .../src/main/resources/META-INF/LICENSE | 211 +++++++++++++++ .../src/main/resources/META-INF/NOTICE | 18 ++ .../nifi-jms-bundle/nifi-jms-cf-service/pom.xml | 46 ++++ .../jms/cf/JMSConnectionFactoryProvider.java | 265 +++++++++++++++++++ .../JMSConnectionFactoryProviderDefinition.java | 116 ++++++++ .../main/java/org/apache/nifi/jms/cf/Utils.java | 107 ++++++++ ...org.apache.nifi.controller.ControllerService | 15 ++ .../additionalDetails.html | 56 ++++ .../cf/JMSConnectionFactoryProviderTest.java | 196 ++++++++++++++ .../java/org/apache/nifi/jms/cf/TestUtils.java | 65 +++++ .../src/test/resources/log4j.properties | 20 ++ .../nifi-jms-cf-service/test-lib/README | 4 + .../TestConnectionFactory.java | 111 ++++++++ .../test-lib/test_user_lib.jar | Bin 0 -> 3452 bytes .../nifi-jms-processors-nar/pom.xml | 37 +++ .../src/main/resources/META-INF/LICENSE | 209 +++++++++++++++ .../src/main/resources/META-INF/NOTICE | 18 ++ .../nifi-jms-bundle/nifi-jms-processors/pom.xml | 68 +++++ .../jms/processors/AbstractJMSProcessor.java | 210 +++++++++++++++ .../apache/nifi/jms/processors/ConsumeJMS.java | 124 +++++++++ .../apache/nifi/jms/processors/JMSConsumer.java | 183 +++++++++++++ .../nifi/jms/processors/JMSPublisher.java | 158 +++++++++++ .../apache/nifi/jms/processors/JMSWorker.java | 60 +++++ .../processors/MessageBodyToBytesConverter.java | 133 ++++++++++ .../apache/nifi/jms/processors/PublishJMS.java | 142 ++++++++++ .../org.apache.nifi.processor.Processor | 16 ++ .../additionalDetails.html | 61 +++++ .../additionalDetails.html | 64 +++++ .../apache/nifi/jms/processors/CommonTest.java | 61 +++++ .../nifi/jms/processors/ConsumeJMSTest.java | 60 +++++ .../processors/JMSPublisherConsumerTest.java | 129 +++++++++ .../nifi/jms/processors/PublishJMSTest.java | 102 +++++++ .../src/test/resources/log4j.properties | 23 ++ nifi-nar-bundles/nifi-jms-bundle/pom.xml | 32 +++ nifi-nar-bundles/pom.xml | 3 +- pom.xml | 12 + 38 files changed, 3181 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 6088e63..38cda63 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -282,6 +282,16 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-splunk-nar</artifactId> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-cf-service-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-processors-nar</artifactId> + <type>nar</type> + </dependency> </dependencies> <properties> http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/pom.xml b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/pom.xml new file mode 100644 index 0000000..e462272 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/pom.xml @@ -0,0 +1,37 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-bundle</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-jms-cf-service-nar</artifactId> + <packaging>nar</packaging> + <description>NiFi NAR for interacting with JMS-based messaging systems</description> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-cf-service</artifactId> + <version>0.6.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..b3c6e30 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,211 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..89ac02e --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,18 @@ +nifi-jmsâcf-service-nar +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2016 The Apache Software Foundation + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/pom.xml b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/pom.xml new file mode 100644 index 0000000..f8fad7c --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/pom.xml @@ -0,0 +1,46 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to You under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-bundle</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-jms-cf-service</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>javax.jms</groupId> + <artifactId>javax.jms-api</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java new file mode 100644 index 0000000..e279bae --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.cf; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; + +import javax.jms.ConnectionFactory; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides a factory service that creates and initializes + * {@link ConnectionFactory} specific to the third party JMS system. + * <p> + * It accomplishes it by adjusting current classpath by adding to it the + * additional resources (i.e., JMS client libraries) provided by the user via + * {@link JMSConnectionFactoryProviderDefinition#CLIENT_LIB_DIR_PATH}, allowing + * it then to create an instance of the target {@link ConnectionFactory} based + * on the provided {@link JMSConnectionFactoryProviderDefinition#CONNECTION_FACTORY_IMPL} + * which can be than access via {@link #getConnectionFactory()} method. + * </p> + */ +@Tags({ "jms", "messaging", "integration", "queue", "topic", "publish", "subscribe" }) +@CapabilityDescription("Provides a generic service to create vendor specific javax.jms.ConnectionFactory implementations. " + + "ConnectionFactory can be served once this service is configured successfully") +@DynamicProperty(name = "The name of a Connection Factory configuration property.", value = "The value of a given Connection Factory configuration property.", + description = "The properties that are set following Java Beans convention where a property name is derived from the 'set*' method of the vendor " + + "specific ConnectionFactory's implementation. For example, 'com.ibm.mq.jms.MQConnectionFactory.setChannel(String)' would imply 'channel' " + + "property and 'com.ibm.mq.jms.MQConnectionFactory.setTransportType(int)' would imply 'transportType' property.") +@SeeAlso(classNames = { "org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS" }) +public class JMSConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition { + + private final Logger logger = LoggerFactory.getLogger(JMSConnectionFactoryProvider.class); + + private static final List<PropertyDescriptor> propertyDescriptors; + + static { + propertyDescriptors = Collections.unmodifiableList(Arrays.asList(CONNECTION_FACTORY_IMPL, CLIENT_LIB_DIR_PATH, BROKER_URI, SSL_CONTEXT_SERVICE)); + } + + private volatile boolean configured; + + private volatile ConnectionFactory connectionFactory; + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + /** + * + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + + "' property to be set on the provided ConnectionFactory implementation.") + .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) + .build(); + } + + /** + * + * @return new instance of {@link ConnectionFactory} + */ + @Override + public ConnectionFactory getConnectionFactory() { + if (this.configured) { + return this.connectionFactory; + } + throw new IllegalStateException("ConnectionFactory can not be obtained unless " + + "this ControllerService is configured. See onConfigure(ConfigurationContext) method."); + } + + /** + * + */ + @OnEnabled + public void enable(ConfigurationContext context) throws InitializationException { + try { + if (!this.configured) { + if (logger.isInfoEnabled()) { + logger.info("Configuring " + this.getClass().getSimpleName() + " for '" + + context.getProperty(CONNECTION_FACTORY_IMPL).getValue() + "' to be conected to '" + + BROKER_URI + "'"); + } + // will load user provided libraries/resources on the classpath + Utils.addResourcesToClasspath(context.getProperty(CLIENT_LIB_DIR_PATH).getValue()); + + this.createConnectionFactoryInstance(context); + + this.setConnectionFactoryProperties(context); + } + this.configured = true; + } catch (Exception e) { + logger.error("Failed to configure " + this.getClass().getSimpleName(), e); + this.configured = false; + throw new IllegalStateException(e); + } + } + + /** + * + */ + @OnDisabled + public void disable() { + this.connectionFactory = null; + this.configured = false; + } + + /** + * This operation follows standard bean convention by matching property name + * to its corresponding 'setter' method. Once the method was located it is + * invoked to set the corresponding property to a value provided by during + * service configuration. For example, 'channel' property will correspond to + * 'setChannel(..) method and 'queueManager' property will correspond to + * setQueueManager(..) method with a single argument. + * + * There are also few adjustments to accommodate well known brokers. For + * example ActiveMQ ConnectionFactory accepts address of the Message Broker + * in a form of URL while IBMs in the form of host/port pair (more common). + * So this method will use value retrieved from the 'BROKER_URI' static + * property 'as is' if ConnectionFactory implementation is coming from + * ActiveMQ and for all others (for now) the 'BROKER_URI' value will be + * split on ':' and the resulting pair will be used to execute + * setHostName(..) and setPort(..) methods on the provided + * ConnectionFactory. This may need to be maintained and adjusted to + * accommodate other implementation of ConnectionFactory, but only for + * URL/Host/Port issue. All other properties are set as dynamic properties + * where user essentially provides both property name and value, The bean + * convention is also explained in user manual for this component with links + * pointing to documentation of various ConnectionFactories. + * + * @see #setProperty(String, String) method + */ + private void setConnectionFactoryProperties(ConfigurationContext context) { + for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + PropertyDescriptor descriptor = entry.getKey(); + String propertyName = descriptor.getName(); + if (descriptor.isDynamic()) { + this.setProperty(propertyName, entry.getValue()); + } else { + if (propertyName.equals(BROKER)) { + if (context.getProperty(CONNECTION_FACTORY_IMPL).getValue().startsWith("org.apache.activemq")) { + this.setProperty("brokerURL", entry.getValue()); + } else { + String[] hostPort = entry.getValue().split(":"); + if (hostPort.length == 2) { + this.setProperty("hostName", hostPort[0]); + this.setProperty("port", hostPort[1]); + } else if (hostPort.length != 2) { + this.setProperty("serverUrl", entry.getValue()); // for tibco + } else { + throw new IllegalArgumentException("Failed to parse broker url: " + entry.getValue()); + } + } + SSLContextService sc = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sc != null) { + SSLContext ssl = sc.createSSLContext(ClientAuth.NONE); + this.setProperty("sSLSocketFactory", ssl.getSocketFactory()); + } + } // ignore 'else', since it's the only non-dynamic property that is relevant to CF configuration + } + } + } + + /** + * Sets corresponding {@link ConnectionFactory}'s property to a + * 'propertyValue' by invoking a 'setter' method that corresponds to + * 'propertyName'. For example, 'channel' property will correspond to + * 'setChannel(..) method and 'queueManager' property will correspond to + * setQueueManager(..) method with a single argument. + * + * NOTE: There is a limited type conversion to accommodate property value + * types since all NiFi configuration properties comes as String. It is + * accomplished by checking the argument type of the method and executing + * its corresponding conversion to target primitive (e.g., value 'true' will + * go thru Boolean.parseBoolean(propertyValue) if method argument is of type + * boolean). None-primitive values are not supported at the moment and will + * result in {@link IllegalArgumentException}. It is OK though since based + * on analysis of several ConnectionFactory implementation the all seem to + * follow bean convention and all their properties using Java primitives as + * arguments. + */ + private void setProperty(String propertyName, Object propertyValue) { + String methodName = this.toMethodName(propertyName); + Method method = Utils.findMethod(methodName, this.connectionFactory.getClass()); + if (method != null) { + try { + Class<?> returnType = method.getParameterTypes()[0]; + if (String.class.isAssignableFrom(returnType)) { + method.invoke(this.connectionFactory, propertyValue); + } else if (int.class.isAssignableFrom(returnType)) { + method.invoke(this.connectionFactory, Integer.parseInt((String) propertyValue)); + } else if (long.class.isAssignableFrom(returnType)) { + method.invoke(this.connectionFactory, Long.parseLong((String) propertyValue)); + } else if (boolean.class.isAssignableFrom(returnType)) { + method.invoke(this.connectionFactory, Boolean.parseBoolean((String) propertyValue)); + } else { + method.invoke(this.connectionFactory, propertyValue); + } + } catch (Exception e) { + throw new IllegalStateException("Failed to set property " + propertyName, e); + } + } else if (propertyName.equals("hostName")) { + this.setProperty("host", propertyValue); // try 'host' as another common convention. + } + } + + /** + * Creates an instance of the {@link ConnectionFactory} from the provided + * 'CONNECTION_FACTORY_IMPL'. + */ + private void createConnectionFactoryInstance(ConfigurationContext context) { + String connectionFactoryImplName = context.getProperty(CONNECTION_FACTORY_IMPL).getValue(); + this.connectionFactory = Utils.newDefaultInstance(connectionFactoryImplName); + } + + /** + * Will convert propertyName to a method name following bean convention. For + * example, 'channel' property will correspond to 'setChannel method and + * 'queueManager' property will correspond to setQueueManager method name + */ + private String toMethodName(String propertyName) { + char c[] = propertyName.toCharArray(); + c[0] = Character.toUpperCase(c[0]); + return "set" + new String(c); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java new file mode 100644 index 0000000..4375e22 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.cf; + +import java.io.File; + +import javax.jms.ConnectionFactory; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +/** + * Defines a strategy to create implementations to load and initialize third + * party implementations of the {@link ConnectionFactory} + */ +public interface JMSConnectionFactoryProviderDefinition extends ControllerService { + static final String BROKER = "broker"; + static final String CF_IMPL = "cf"; + static final String CF_LIB = "cflib"; + + public static final PropertyDescriptor CONNECTION_FACTORY_IMPL = new PropertyDescriptor.Builder() + .name(CF_IMPL) + .displayName("MQ ConnectionFactory Implementation") + .description("A fully qualified name of the JMS ConnectionFactory implementation " + + "class (i.e., org.apache.activemq.ActiveMQConnectionFactory)") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder() + .name(CF_LIB) + .displayName("MQ Client Libraries path (i.e., /usr/jms/lib)") + .description("Path to the directory with additional resources (i.e., JARs, configuration files etc.) to be added " + + "to the classpath. Such resources typically represent target MQ client libraries for the " + + "ConnectionFactory implementation.") + .addValidator(new ClientLibValidator()) + .required(true) + .build(); + + // ConnectionFactory specific properties + public static final PropertyDescriptor BROKER_URI = new PropertyDescriptor.Builder() + .name(BROKER) + .displayName("Broker URI") + .description("URI pointing to the network location of the JMS Message broker. For example, " + + "'tcp://myhost:61616' for ActiveMQ or 'myhost:1414' for IBM MQ") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + /** + * Returns an instance of the {@link ConnectionFactory} specific to the + * target messaging system (i.e., + * org.apache.activemq.ActiveMQConnectionFactory). It is created based on + * the value of the supplied 'CONNECTION_FACTORY_IMPL' property and JMS + * client libraries supplied via 'CLIENT_LIB_DIR_PATH' property. + * + * @return instance of {@link ConnectionFactory} + */ + ConnectionFactory getConnectionFactory(); + + /** + * + */ + static class ClientLibValidator implements Validator { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + String libDirPath = context.getProperty(CLIENT_LIB_DIR_PATH).getValue(); + StringBuilder invalidationMessageBuilder = new StringBuilder(); + if (libDirPath != null) { + File file = new File(libDirPath); + if (!file.isDirectory()) { + invalidationMessageBuilder.append("'MQ Client Libraries path' must point to a directory. Was '" + + file.getAbsolutePath() + "'."); + } + } else { + invalidationMessageBuilder.append("'MQ Client Libraries path' must be provided. \n"); + } + String invalidationMessage = invalidationMessageBuilder.toString(); + ValidationResult vResult; + if (invalidationMessage.length() == 0) { + vResult = new ValidationResult.Builder().subject(subject).input(input) + .explanation("Client lib path is valid and points to a directory").valid(true).build(); + } else { + vResult = new ValidationResult.Builder().subject(subject).input(input) + .explanation("Client lib path is invalid. " + invalidationMessage) + .valid(false).build(); + } + return vResult; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/Utils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/Utils.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/Utils.java new file mode 100644 index 0000000..cd191c3 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/Utils.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.cf; + +import java.io.File; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public final class Utils { + + private static final Logger logger = LoggerFactory.getLogger(Utils.class); + + /** + * Creates new instance of the class specified by 'className' by first + * loading it using thread context class loader and then executing default + * constructor. + */ + @SuppressWarnings("unchecked") + static <T> T newDefaultInstance(String className) { + try { + Class<T> clazz = (Class<T>) Class.forName(className, false, Thread.currentThread().getContextClassLoader()); + return clazz.newInstance(); + } catch (Exception e) { + throw new IllegalStateException("Failed to load and/or instantiate class '" + className + "'", e); + } + } + + /** + * Finds a method by name on the target class. If more then one method + * present it will return the first one encountered. + * + * @param name method name + * @param targetClass instance of target class + * @return instance of {@link Method} + */ + public static Method findMethod(String name, Class<?> targetClass) { + Class<?> searchType = targetClass; + while (searchType != null) { + Method[] methods = (searchType.isInterface() ? searchType.getMethods() : searchType.getDeclaredMethods()); + for (Method method : methods) { + if (name.equals(method.getName())) { + return method; + } + } + searchType = searchType.getSuperclass(); + } + return null; + } + + /** + * Adds content of the directory specified with 'path' to the classpath. It + * does so by creating a new instance of the {@link URLClassLoader} using + * {@link URL}s created from listing the contents of the directory denoted + * by 'path' and setting it as thread context class loader. + */ + static void addResourcesToClasspath(String path) { + if (logger.isDebugEnabled()) { + logger.debug("Adding additional resources from '" + path + "' to the classpath."); + } + if (path == null) { + throw new IllegalArgumentException("'path' must not be null"); + } + File libraryDir = new File(path); + if (libraryDir.exists() && libraryDir.isDirectory()) { + String[] cpResourceNames = libraryDir.list(); + URL[] urls = new URL[cpResourceNames.length]; + try { + for (int i = 0; i < urls.length; i++) { + urls[i] = new File(libraryDir, cpResourceNames[i]).toURI().toURL(); + if (logger.isDebugEnabled()) { + logger.debug("Identifying additional resource to the classpath: " + urls[i]); + } + } + } catch (Exception e) { + throw new IllegalStateException( + "Failed to parse user libraries from '" + libraryDir.getAbsolutePath() + "'", e); + } + + URLClassLoader cl = new URLClassLoader(urls, Utils.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(cl); + } else { + throw new IllegalArgumentException("Path '" + libraryDir.getAbsolutePath() + + "' is not valid because it doesn't exist or does not point to a directory."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..f191675 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.jms.cf.JMSConnectionFactoryProvider http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html new file mode 100644 index 0000000..0cd3681 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html @@ -0,0 +1,56 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>JMSConnectionFactoryProvider</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <h2>Description:</h2> + <p> + This ControllerService serves as a general factory service to serving vendor specific + instances of the <i>javax.jms.ConnectionFactory</i>. It does so by allowing user to + configure vendor specific properties as well as point to the location of the vendor + provided JMS client libraries so the correct implementation of the <i>javax.jms.ConnectionFactory</i> + can be found, loaded, instantiated and served to the dependent Processors (see PublishJMS, ConsumeJMS). + </p> + <p> + To accommodate variety of JMS vendors and their implementation of the <i>ConnectionFactory</i> + this ControllerService exposes only 3 static configuration properties that are common across many implementations + of the <i>ConnectionFactory</i>. The rest of the configuration properties are set following + <a href="http://docstore.mik.ua/orelly/java-ent/jnut/ch06_02.htm">Java Beans</a> convention (see below). + </p> + <p> + The 3 static configuration properties are: + <ul> + <li><b>MQ ConnectionFactory Implementation</b> - A fully qualified name of the JMS ConnectionFactory implementation + class (i.e., org.apache.activemq.ActiveMQConnectionFactory)</li> + <li><b>MQ Client Libraries path</b> - Path to the directory with additional resources (i.e., JARs, configuration files etc.) to be added + to the classpath. Such resources typically represent target MQ client libraries for the ConnectionFactory + implementation. It is optional if you are using Apache ActiveMQ since its libraries are distributed with this component.</li> + <li><b>Broker URI</b> - URI pointing to the network location of the JMS Message broker. For example, 'tcp://myhost:61616' for ActiveMQ or simply 'myhost:1414'.</li> + </ul> + The rest of the properties are set as Dynamic Properties following <a href="http://docstore.mik.ua/orelly/java-ent/jnut/ch06_02.htm">Java Beans</a> + convention where a property name is derived from the <i>set*</i> method of the vendor specific ConnectionFactory's implementation. + For example, <i>com.ibm.mq.jms.MQConnectionFactory.setChannel(String)</i> would imply 'channel' property and + <i>com.ibm.mq.jms.MQConnectionFactory.setTransportType(int)</i> would imply 'transportType' property. + For the list of available properties please consult vendor provided documentation. Here is an example for + <a href="https://www-01.ibm.com/support/knowledgecenter/SSFKSJ_8.0.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQQueueConnectionFactory.html">IBM's com.ibm.mq.jms.MQConnectionFactory</a> + </p> + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java new file mode 100644 index 0000000..2280273 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.cf; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.ServiceLoader; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JMSConnectionFactoryProviderTest { + + private static Logger logger = LoggerFactory.getLogger(JMSConnectionFactoryProviderTest.class); + + @Test + public void validateFullConfigWithUserLib() throws Exception { + TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); + runner.addControllerService("cfProvider", cfProvider); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); + + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, + new File("test-lib").getAbsolutePath()); // see README in 'test-lib' dir for more info + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, + "org.apache.nifi.jms.testcflib.TestConnectionFactory"); + runner.setProperty(cfProvider, "Foo", "foo"); + runner.setProperty(cfProvider, "Bar", "3"); + + runner.enableControllerService(cfProvider); + runner.assertValid(cfProvider); + ConnectionFactory cf = cfProvider.getConnectionFactory(); + assertNotNull(cf); + assertEquals("org.apache.nifi.jms.testcflib.TestConnectionFactory", cf.getClass().getName()); + assertEquals("myhost", this.get("getHost", cf)); + assertEquals(1234, this.get("getPort", cf)); + assertEquals("foo", this.get("getFoo", cf)); + assertEquals(3, this.get("getBar", cf)); + } + + @Test(expected = AssertionError.class) + public void validateOnConfigureFailsIfCNFonConnectionFactory() throws Exception { + TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); + runner.addControllerService("cfProvider", cfProvider); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); + + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "test-lib"); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, + "foo.bar.NonExistingConnectionFactory"); + runner.enableControllerService(cfProvider); + } + + @Test + public void validateNotValidForNonExistingLibPath() throws Exception { + TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); + runner.addControllerService("cfProvider", cfProvider); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); + + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "foo"); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, + "org.apache.nifi.jms.testcflib.TestConnectionFactory"); + runner.assertNotValid(cfProvider); + } + + @Test(expected = AssertionError.class) + public void validateFailsIfURINotHostPortAndNotActiveMQ() throws Exception { + TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); + runner.addControllerService("cfProvider", cfProvider); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost"); + + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "test-lib"); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, + "org.apache.nifi.jms.testcflib.TestConnectionFactory"); + runner.enableControllerService(cfProvider); + runner.assertNotValid(cfProvider); + } + + @Test + public void validateNotValidForNonDirectoryPath() throws Exception { + TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); + runner.addControllerService("cfProvider", cfProvider); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234"); + + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "pom.xml"); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, + "org.apache.nifi.jms.testcflib.TestConnectionFactory"); + runner.assertNotValid(cfProvider); + } + + @Test(expected = IllegalStateException.class) + public void validateGetConnectionFactoryFailureIfServiceNotConfigured() throws Exception { + new JMSConnectionFactoryProvider().getConnectionFactory(); + } + + /** + * This test simply validates that {@link ConnectionFactory} can be setup by + * pointing to the location of the client libraries at runtime. It uses + * ActiveMQ which is not present at the POM but instead pulled from Maven + * repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which + * implies that for this test to run the computer must be connected to the + * Internet. If computer is not connected to the Internet, this test will + * quietly fail logging a message. + */ + @Test + public void validateFactoryCreationWithActiveMQLibraries() throws Exception { + try { + String libPath = TestUtils.setupActiveMqLibForTesting(true); + + TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider(); + runner.addControllerService("cfProvider", cfProvider); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, + "vm://localhost?broker.persistent=false"); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath); + runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, + "org.apache.activemq.ActiveMQConnectionFactory"); + runner.enableControllerService(cfProvider); + runner.assertValid(cfProvider); + + Connection connection = cfProvider.getConnectionFactory().createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createQueue("myqueue"); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + TextMessage message = session.createTextMessage("Hello"); + producer.send(message); + assertEquals("Hello", ((TextMessage) consumer.receive()).getText()); + connection.stop(); + connection.close(); + } catch (Exception e) { + logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e); + } + } + + @Test + public void validateServiceIsLocatableViaServiceLoader() { + ServiceLoader<ControllerService> loader = ServiceLoader.<ControllerService> load(ControllerService.class); + Iterator<ControllerService> iter = loader.iterator(); + boolean present = false; + while (iter.hasNext()) { + ControllerService cs = iter.next(); + assertTrue(cs instanceof JMSConnectionFactoryProviderDefinition); + present = true; + } + assertTrue(present); + } + + @SuppressWarnings("unchecked") + private <T> T get(String methodName, ConnectionFactory cf) throws Exception { + Method m = Utils.findMethod(methodName, cf.getClass()); + return (T) m.invoke(cf); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/TestUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/TestUtils.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/TestUtils.java new file mode 100644 index 0000000..7d5e636 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/TestUtils.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.cf; + +import java.io.File; +import java.io.FileOutputStream; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +class TestUtils { + + static Logger logger = LoggerFactory.getLogger(TestUtils.class); + + static String setupActiveMqLibForTesting(boolean clean) { + String[] urlsStrings = new String[]{ + "http://central.maven.org/maven2/org/apache/activemq/activemq-client/5.13.0/activemq-client-5.13.0.jar", + "http://central.maven.org/maven2/org/apache/activemq/activemq-broker/5.13.0/activemq-broker-5.13.0.jar", + "http://central.maven.org/maven2/org/apache/geronimo/specs/geronimo-j2ee-management_1.0_spec/1.0.1/geronimo-j2ee-management_1.0_spec-1.0.1.jar", + "http://central.maven.org/maven2/org/fusesource/hawtbuf/hawtbuf/1.11/hawtbuf-1.11.jar" }; + + try { + File activeMqLib = new File("target/active-mq-lib"); + if (activeMqLib.exists() && clean) { + FileUtils.deleteDirectory(activeMqLib); + } + activeMqLib.mkdirs(); + for (String urlString : urlsStrings) { + URL url = new URL(urlString); + String path = url.getPath(); + path = path.substring(path.lastIndexOf("/") + 1); + logger.info("Downloading: " + path); + ReadableByteChannel rbc = Channels.newChannel(url.openStream()); + try (FileOutputStream fos = new FileOutputStream(new File(activeMqLib, path))) { + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + } + } + return activeMqLib.getAbsolutePath(); + } catch (Exception e) { + throw new IllegalStateException("Failed to download ActiveMQ libraries.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/resources/log4j.properties new file mode 100644 index 0000000..ad19977 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/resources/log4j.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootCategory=DEBUG, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n + http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/README ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/README b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/README new file mode 100644 index 0000000..48b9e8a --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/README @@ -0,0 +1,4 @@ +The binary contained in this folder is used for testing and contains only one +class that you can find in 'org.apache.nifi.jms.testcflib' folder. This class +represents test implementation of javax.jms.ConnectionFactory used to validate +that classes are loaded from user defined class path directory. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/org.apache.nifi.jms.testcflib/TestConnectionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/org.apache.nifi.jms.testcflib/TestConnectionFactory.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/org.apache.nifi.jms.testcflib/TestConnectionFactory.java new file mode 100644 index 0000000..96839c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/org.apache.nifi.jms.testcflib/TestConnectionFactory.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.testcflib; + +import static org.mockito.Mockito.mock; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSContext; +import javax.jms.JMSException; + +public class TestConnectionFactory implements ConnectionFactory { + + private String user; + private String password; + private String foo; + private int bar; + private String host; + + private int port; + + @Override + public Connection createConnection() throws JMSException { + return mock(Connection.class); + } + + @Override + public Connection createConnection(String userName, String password) throws JMSException { + this.user = user; + this.password = password; + return mock(Connection.class); + } + + @Override + public JMSContext createContext() { + return mock(JMSContext.class); + } + + @Override + public JMSContext createContext(String userName, String password) { + this.user = user; + this.password = password; + return mock(JMSContext.class); + } + + @Override + public JMSContext createContext(String userName, String password, int sessionMode) { + this.user = user; + this.password = password; + return mock(JMSContext.class); + } + + @Override + public JMSContext createContext(int sessionMode) { + return mock(JMSContext.class); + } + + public void setFoo(String foo) { + this.foo = foo; + } + + public void setBar(int bar) { + this.bar = bar; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } + + public String getFoo() { + return foo; + } + + public int getBar() { + return bar; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/test_user_lib.jar ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/test_user_lib.jar b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/test_user_lib.jar new file mode 100644 index 0000000..95f1121 Binary files /dev/null and b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/test-lib/test_user_lib.jar differ http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/pom.xml b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/pom.xml new file mode 100644 index 0000000..72fec4c --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/pom.xml @@ -0,0 +1,37 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-bundle</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-jms-processors-nar</artifactId> + <packaging>nar</packaging> + <description>NiFi NAR for interacting with JMS-based messaging systems</description> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-cf-service-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-processors</artifactId> + <version>0.6.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project>
