Repository: nifi
Updated Branches:
  refs/heads/master 10479a5a2 -> ebead820f


NIFI-5741: When returning a ConnectionFactory from the 
JndiJmsConnectionFactoryProvider, ensure that we wrap the ConnectionFactory so 
that any calls to the ConnectionFactory happen within the context of the 
Controller Service's Class Loader

This closes #3106.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ebead820
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ebead820
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ebead820

Branch: refs/heads/master
Commit: ebead820f9d9ac7d629bd099791acf4dcd95e13f
Parents: 10479a5
Author: Mark Payne <[email protected]>
Authored: Tue Oct 23 14:42:32 2018 -0400
Committer: Bryan Bende <[email protected]>
Committed: Tue Oct 23 16:17:25 2018 -0400

----------------------------------------------------------------------
 .../cf/JndiJmsConnectionFactoryProvider.java    | 80 ++++++++++++++------
 .../jms/processors/AbstractJMSProcessor.java    | 17 ++---
 .../apache/nifi/jms/processors/ConsumeJMS.java  | 21 +++--
 .../nifi/jms/processors/JMSPublisher.java       | 27 +++----
 .../apache/nifi/jms/processors/PublishJMS.java  | 19 +++--
 5 files changed, 94 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
index 876d933..a293d84 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
@@ -34,6 +34,9 @@ import javax.jms.ConnectionFactory;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.Arrays;
 import java.util.Hashtable;
 import java.util.List;
@@ -49,7 +52,7 @@ import static 
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDA
     value = "The value of the JNDI Initial Context Environment variable.",
     expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
 @SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", 
"org.apache.nifi.jms.processors.PublishJMS", 
"org.apache.nifi.jms.cf.JMSConnectionFactoryProvider"})
-public class JndiJmsConnectionFactoryProvider extends 
AbstractControllerService implements JMSConnectionFactoryProviderDefinition{
+public class JndiJmsConnectionFactoryProvider extends 
AbstractControllerService implements JMSConnectionFactoryProviderDefinition {
 
     static final PropertyDescriptor INITIAL_NAMING_FACTORY_CLASS = new 
Builder()
         .name("java.naming.factory.initial")
@@ -146,34 +149,15 @@ public class JndiJmsConnectionFactoryProvider extends 
AbstractControllerService
         return connectionFactory;
     }
 
+
     private ConnectionFactory lookupConnectionFactory() {
         try {
             final ConfigurationContext context = getConfigurationContext();
 
-            final Hashtable<String, String> env = new Hashtable<>();
-            env.put(Context.INITIAL_CONTEXT_FACTORY, 
context.getProperty(INITIAL_NAMING_FACTORY_CLASS).evaluateAttributeExpressions().getValue().trim());
-            env.put(Context.PROVIDER_URL, 
context.getProperty(NAMING_PROVIDER_URL).evaluateAttributeExpressions().getValue().trim());
-
-            final String principal = 
context.getProperty(PRINCIPAL).evaluateAttributeExpressions().getValue();
-            if (principal != null) {
-                env.put(Context.SECURITY_PRINCIPAL, principal);
-            }
-
-            final String credentials = 
context.getProperty(CREDENTIALS).getValue();
-            if (credentials != null) {
-                env.put(Context.SECURITY_CREDENTIALS, credentials);
-            }
-
-            context.getProperties().keySet().forEach(descriptor -> {
-                if (descriptor.isDynamic()) {
-                    env.put(descriptor.getName(), 
context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
-                }
-            });
-
             final String factoryName = 
context.getProperty(CONNECTION_FACTORY_NAME).evaluateAttributeExpressions().getValue().trim();
-            getLogger().debug("Looking up Connection Factory with name [{}] 
using JNDI Environment {}", new Object[] {factoryName, env});
+            getLogger().debug("Looking up Connection Factory with name [{}]", 
new Object[] {factoryName});
 
-            final Context initialContext = new InitialContext(env);
+            final Context initialContext = createInitialContext();
             final Object factoryObject = initialContext.lookup(factoryName);
 
             getLogger().debug("Obtained {} from JNDI", new Object[] 
{factoryObject});
@@ -186,9 +170,57 @@ public class JndiJmsConnectionFactoryProvider extends 
AbstractControllerService
                     "Instead, is of type " + factoryObject.getClass() + " : " 
+ factoryObject);
             }
 
-            return (ConnectionFactory) factoryObject;
+            return (ConnectionFactory) 
instrumentWithClassLoader(factoryObject, 
Thread.currentThread().getContextClassLoader(), ConnectionFactory.class);
         } catch (final NamingException ne) {
             throw new ProcessException("Could not obtain JMS Connection 
Factory from JNDI", ne);
         }
     }
+
+
+    private Context createInitialContext() throws NamingException {
+        final ConfigurationContext context = getConfigurationContext();
+
+        final Hashtable<String, String> env = new Hashtable<>();
+        env.put(Context.INITIAL_CONTEXT_FACTORY, 
context.getProperty(INITIAL_NAMING_FACTORY_CLASS).evaluateAttributeExpressions().getValue().trim());
+        env.put(Context.PROVIDER_URL, 
context.getProperty(NAMING_PROVIDER_URL).evaluateAttributeExpressions().getValue().trim());
+
+        final String principal = 
context.getProperty(PRINCIPAL).evaluateAttributeExpressions().getValue();
+        if (principal != null) {
+            env.put(Context.SECURITY_PRINCIPAL, principal);
+        }
+
+        final String credentials = context.getProperty(CREDENTIALS).getValue();
+        if (credentials != null) {
+            env.put(Context.SECURITY_CREDENTIALS, credentials);
+        }
+
+        context.getProperties().keySet().forEach(descriptor -> {
+            if (descriptor.isDynamic()) {
+                env.put(descriptor.getName(), 
context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
+            }
+        });
+
+        getLogger().debug("Creating Initial Context using JNDI Environment 
{}", new Object[] {env});
+
+        final Context initialContext = new InitialContext(env);
+        return initialContext;
+    }
+
+    public static Object instrumentWithClassLoader(final Object obj, final 
ClassLoader classLoader, final Class<?>... interfaces) {
+        final InvocationHandler invocationHandler = new InvocationHandler() {
+            @Override
+            public Object invoke(final Object proxy, final Method method, 
final Object[] args) throws Throwable {
+                final Thread thread = Thread.currentThread();
+                final ClassLoader currentClassLoader = 
thread.getContextClassLoader();
+                try {
+                    thread.setContextClassLoader(classLoader);
+                    return method.invoke(obj, args);
+                } finally {
+                    thread.setContextClassLoader(currentClassLoader);
+                }
+            }
+        };
+
+        return Proxy.newProxyInstance(classLoader, interfaces, 
invocationHandler);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index a4cf6d4..193e72a 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -16,15 +16,6 @@
  */
 package org.apache.nifi.jms.processors;
 
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.ConnectionFactory;
-
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -41,6 +32,14 @@ import 
org.springframework.jms.connection.CachingConnectionFactory;
 import 
org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
 import org.springframework.jms.core.JmsTemplate;
 
+import javax.jms.ConnectionFactory;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Base JMS processor to support implementation of JMS producers and consumers.
  *

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index b7103ff..ff05f6a 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -16,17 +16,6 @@
  */
 package org.apache.nifi.jms.processors;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import javax.jms.Session;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -50,6 +39,16 @@ import 
org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
 /**
  * Consuming JMS processor which upon each invocation of
  * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 392157f..c13f4b7 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -16,11 +16,12 @@
  */
 package org.apache.nifi.jms.processors;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
+import org.apache.nifi.logging.ComponentLog;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.support.JmsHeaders;
 
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
@@ -30,13 +31,11 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.core.SessionCallback;
-import org.springframework.jms.support.JmsHeaders;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 /**
  * Generic publisher of messages to JMS compliant messaging system.
@@ -64,10 +63,6 @@ final class JMSPublisher extends JMSWorker {
         });
     }
 
-    void publish(String destinationName, String messageText) {
-        this.publish(destinationName, messageText, null);
-    }
-
     void publish(String destinationName, String messageText, final Map<String, 
String> flowFileAttributes) {
         this.jmsTemplate.send(destinationName, new MessageCreator() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebead820/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 3afa0f0..a32a895 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -16,22 +16,13 @@
  */
 package org.apache.nifi.jms.processors;
 
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-
 import org.apache.commons.io.IOUtils;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -47,6 +38,14 @@ import 
org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
+import javax.jms.Destination;
+import javax.jms.Message;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * An implementation of JMS Message publishing {@link Processor} which upon 
each
  * invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will

Reply via email to