Repository: nifi Updated Branches: refs/heads/master 10479a5a2 -> ebead820f
NIFI-5741: When returning a ConnectionFactory from the JndiJmsConnectionFactoryProvider, ensure that we wrap the ConnectionFactory so that any calls to the ConnectionFactory happen within the context of the Controller Service's Class Loader This closes #3106. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ebead820 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ebead820 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ebead820 Branch: refs/heads/master Commit: ebead820f9d9ac7d629bd099791acf4dcd95e13f Parents: 10479a5 Author: Mark Payne <[email protected]> Authored: Tue Oct 23 14:42:32 2018 -0400 Committer: Bryan Bende <[email protected]> Committed: Tue Oct 23 16:17:25 2018 -0400 ---------------------------------------------------------------------- .../cf/JndiJmsConnectionFactoryProvider.java | 80 ++++++++++++++------ .../jms/processors/AbstractJMSProcessor.java | 17 ++--- .../apache/nifi/jms/processors/ConsumeJMS.java | 21 +++-- .../nifi/jms/processors/JMSPublisher.java | 27 +++---- .../apache/nifi/jms/processors/PublishJMS.java | 19 +++-- 5 files changed, 94 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java index 876d933..a293d84 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java @@ -34,6 +34,9 @@ import javax.jms.ConnectionFactory; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.Hashtable; import java.util.List; @@ -49,7 +52,7 @@ import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDA value = "The value of the JNDI Initial Context Environment variable.", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) @SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS", "org.apache.nifi.jms.cf.JMSConnectionFactoryProvider"}) -public class JndiJmsConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition{ +public class JndiJmsConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition { static final PropertyDescriptor INITIAL_NAMING_FACTORY_CLASS = new Builder() .name("java.naming.factory.initial") @@ -146,34 +149,15 @@ public class JndiJmsConnectionFactoryProvider extends AbstractControllerService return connectionFactory; } + private ConnectionFactory lookupConnectionFactory() { try { final ConfigurationContext context = getConfigurationContext(); - final Hashtable<String, String> env = new Hashtable<>(); - env.put(Context.INITIAL_CONTEXT_FACTORY, context.getProperty(INITIAL_NAMING_FACTORY_CLASS).evaluateAttributeExpressions().getValue().trim()); - env.put(Context.PROVIDER_URL, context.getProperty(NAMING_PROVIDER_URL).evaluateAttributeExpressions().getValue().trim()); - - final String principal = context.getProperty(PRINCIPAL).evaluateAttributeExpressions().getValue(); - if (principal != null) { - env.put(Context.SECURITY_PRINCIPAL, principal); - } - - final String credentials = context.getProperty(CREDENTIALS).getValue(); - if (credentials != null) { - env.put(Context.SECURITY_CREDENTIALS, credentials); - } - - context.getProperties().keySet().forEach(descriptor -> { - if (descriptor.isDynamic()) { - env.put(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue()); - } - }); - final String factoryName = context.getProperty(CONNECTION_FACTORY_NAME).evaluateAttributeExpressions().getValue().trim(); - getLogger().debug("Looking up Connection Factory with name [{}] using JNDI Environment {}", new Object[] {factoryName, env}); + getLogger().debug("Looking up Connection Factory with name [{}]", new Object[] {factoryName}); - final Context initialContext = new InitialContext(env); + final Context initialContext = createInitialContext(); final Object factoryObject = initialContext.lookup(factoryName); getLogger().debug("Obtained {} from JNDI", new Object[] {factoryObject}); @@ -186,9 +170,57 @@ public class JndiJmsConnectionFactoryProvider extends AbstractControllerService "Instead, is of type " + factoryObject.getClass() + " : " + factoryObject); } - return (ConnectionFactory) factoryObject; + return (ConnectionFactory) instrumentWithClassLoader(factoryObject, Thread.currentThread().getContextClassLoader(), ConnectionFactory.class); } catch (final NamingException ne) { throw new ProcessException("Could not obtain JMS Connection Factory from JNDI", ne); } } + + + private Context createInitialContext() throws NamingException { + final ConfigurationContext context = getConfigurationContext(); + + final Hashtable<String, String> env = new Hashtable<>(); + env.put(Context.INITIAL_CONTEXT_FACTORY, context.getProperty(INITIAL_NAMING_FACTORY_CLASS).evaluateAttributeExpressions().getValue().trim()); + env.put(Context.PROVIDER_URL, context.getProperty(NAMING_PROVIDER_URL).evaluateAttributeExpressions().getValue().trim()); + + final String principal = context.getProperty(PRINCIPAL).evaluateAttributeExpressions().getValue(); + if (principal != null) { + env.put(Context.SECURITY_PRINCIPAL, principal); + } + + final String credentials = context.getProperty(CREDENTIALS).getValue(); + if (credentials != null) { + env.put(Context.SECURITY_CREDENTIALS, credentials); + } + + context.getProperties().keySet().forEach(descriptor -> { + if (descriptor.isDynamic()) { + env.put(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue()); + } + }); + + getLogger().debug("Creating Initial Context using JNDI Environment {}", new Object[] {env}); + + final Context initialContext = new InitialContext(env); + return initialContext; + } + + public static Object instrumentWithClassLoader(final Object obj, final ClassLoader classLoader, final Class<?>... interfaces) { + final InvocationHandler invocationHandler = new InvocationHandler() { + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + final Thread thread = Thread.currentThread(); + final ClassLoader currentClassLoader = thread.getContextClassLoader(); + try { + thread.setContextClassLoader(classLoader); + return method.invoke(obj, args); + } finally { + thread.setContextClassLoader(currentClassLoader); + } + } + }; + + return Proxy.newProxyInstance(classLoader, interfaces, invocationHandler); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index a4cf6d4..193e72a 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -16,15 +16,6 @@ */ package org.apache.nifi.jms.processors; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.ConnectionFactory; - import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; @@ -41,6 +32,14 @@ import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; import org.springframework.jms.core.JmsTemplate; +import javax.jms.ConnectionFactory; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + /** * Base JMS processor to support implementation of JMS producers and consumers. * http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index b7103ff..ff05f6a 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -16,17 +16,6 @@ */ package org.apache.nifi.jms.processors; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import javax.jms.Session; - import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -50,6 +39,16 @@ import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.JmsHeaders; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + /** * Consuming JMS processor which upon each invocation of * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index 392157f..c13f4b7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -16,11 +16,12 @@ */ package org.apache.nifi.jms.processors; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.stream.Collectors; +import org.apache.nifi.logging.ComponentLog; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.jms.core.SessionCallback; +import org.springframework.jms.support.JmsHeaders; import javax.jms.BytesMessage; import javax.jms.Destination; @@ -30,13 +31,11 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; - -import org.apache.nifi.logging.ComponentLog; -import org.springframework.jms.connection.CachingConnectionFactory; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.core.MessageCreator; -import org.springframework.jms.core.SessionCallback; -import org.springframework.jms.support.JmsHeaders; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; /** * Generic publisher of messages to JMS compliant messaging system. @@ -64,10 +63,6 @@ final class JMSPublisher extends JMSWorker { }); } - void publish(String destinationName, String messageText) { - this.publish(destinationName, messageText, null); - } - void publish(String destinationName, String messageText, final Map<String, String> flowFileAttributes) { this.jmsTemplate.send(destinationName, new MessageCreator() { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index 3afa0f0..a32a895 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -16,22 +16,13 @@ */ package org.apache.nifi.jms.processors; -import java.io.StringWriter; -import java.nio.charset.Charset; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import javax.jms.Destination; -import javax.jms.Message; - import org.apache.commons.io.IOUtils; -import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -47,6 +38,14 @@ import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.JmsHeaders; +import javax.jms.Destination; +import javax.jms.Message; +import java.io.StringWriter; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + /** * An implementation of JMS Message publishing {@link Processor} which upon each * invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will
