Repository: cxf
Updated Branches:
  refs/heads/master 0dfaf8d72 -> e1c60863a


[CXF-6742] Run jms destination with InitialContext


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e1c60863
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e1c60863
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e1c60863

Branch: refs/heads/master
Commit: e1c60863ac10b56d423613c7d3d2f45c7ce18e14
Parents: 68f110e
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Fri Jan 15 14:41:02 2016 +0100
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Fri Jan 15 14:41:20 2016 +0100

----------------------------------------------------------------------
 .../cxf/transport/jms/JMSDestination.java       |  1 +
 .../util/AbstractMessageListenerContainer.java  | 29 ++++++++++++++++++++
 .../cxf/transport/jms/util/JndiHelper.java      | 20 +-------------
 .../util/PollingMessageListenerContainer.java   | 24 ++++++----------
 .../cxf/transport/jms/AbstractJMSTester.java    |  2 +-
 .../cxf/transport/jms/JMSDestinationTest.java   |  1 +
 6 files changed, 41 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 113a7d2..2b5d8cd 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -144,6 +144,7 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
             if (executor instanceof Executor) {
                 container.setExecutor((Executor) executor);
             }
+            container.setJndiEnvironment(jmsConfig.getJndiEnvironment());
             container.start();
             suspendedContinuations.setListenerContainer(container);
             connection.start();

http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
index 65d6c4c..8fd1cdc 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
@@ -18,16 +18,20 @@
  */
 package org.apache.cxf.transport.jms.util;
 
+import java.util.Properties;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.MessageListener;
 import javax.jms.Session;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
 import javax.transaction.TransactionManager;
 
 import org.apache.cxf.common.logging.LogUtils;
@@ -45,6 +49,7 @@ public abstract class AbstractMessageListenerContainer 
implements JMSListenerCon
     protected String durableSubscriptionName;
     protected boolean pubSubNoLocal;
     protected TransactionManager transactionManager;
+    protected Properties jndiEnvironment;
 
     private Executor executor;
     private int concurrentConsumers = 1;
@@ -84,7 +89,31 @@ public abstract class AbstractMessageListenerContainer 
implements JMSListenerCon
     public void setExecutor(Executor executor) {
         this.executor = executor;
     }
+    
+    public void setJndiEnvironment(Properties jndiEnvironment) {
+        this.jndiEnvironment = jndiEnvironment;
+    }
 
+    /** 
+     * Creates a InitialContext if a JNDI environment has been provided. 
+     * This is usefull in e.g. weblogic, where interaction with JNDI JMS 
resources is secured.
+     * 
+     * Be careful not to cache the return value in a non thread local scope.
+     * 
+     * @return an initial context, with the endpoint's JNDI properties, 
+     * or null if none is provided or if an errur occurs
+     **/
+    public InitialContext createInitialContext() {
+        if (jndiEnvironment != null) {
+            try {
+                return new InitialContext(this.jndiEnvironment);
+            } catch (NamingException e) {
+                LOG.log(Level.SEVERE, "Could not expose JNDI environment to 
JMS thread context", e);
+            }
+        }
+        return null;
+    }
+    
     @Override
     public void stop() {
         // In case of using external executor, don't shutdown it

http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java
index 7cc0e42..5035e0d 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JndiHelper.java
@@ -18,8 +18,6 @@
  */
 package org.apache.cxf.transport.jms.util;
 
-import java.util.Enumeration;
-import java.util.Hashtable;
 import java.util.Properties;
 
 import javax.naming.Context;
@@ -38,25 +36,9 @@ public class JndiHelper {
         this.environment = environment;
     }
 
-    @SuppressWarnings("rawtypes")
-    protected Context createInitialContext() throws NamingException {
-        //CHECKSTYLE:OFF
-        Hashtable<Object, Object> icEnv = new Hashtable<Object, 
Object>(environment.size());
-        //CHECKSTYLE:ON
-        for (Enumeration en = environment.propertyNames(); 
en.hasMoreElements();) {
-            String key = (String)en.nextElement();
-            Object value = environment.getProperty(key);
-            if (value == null) {
-                value = environment.get(key);
-            }
-            icEnv.put(key, value);
-        }
-        return new InitialContext(icEnv);
-    }
-
     @SuppressWarnings("unchecked")
     public <T> T lookup(final String name, Class<T> requiredType) throws 
NamingException {
-        Context ctx = createInitialContext();
+        Context ctx = new InitialContext(this.environment);
         try {
             Object located = ctx.lookup(name);
             if (located == null) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 0acd40f..9f8fcb2 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -49,12 +49,11 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
         @Override
         public void run() {
             while (running) {
-                MessageConsumer consumer = null;
-                Session session = null;
-                try {
+                try (ResourceCloser closer = new ResourceCloser()) {
+                    closer.register(createInitialContext());
                     // Create session early to optimize performance
-                    session = connection.createSession(transacted, 
acknowledgeMode);
-                    consumer = createConsumer(session);
+                    Session session = 
closer.register(connection.createSession(transacted, acknowledgeMode));
+                    MessageConsumer consumer = 
closer.register(createConsumer(session));
                     while (running) {
                         Message message = consumer.receive(1000);
                         try {
@@ -71,9 +70,6 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
                     }
                 } catch (Exception e) {
                     LOG.log(Level.WARNING, "Unexpected exception. Restarting 
session and consumer", e);
-                } finally {
-                    ResourceCloser.close(consumer);
-                    ResourceCloser.close(session);
                 }
             }
 
@@ -96,9 +92,8 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
         @Override
         public void run() {
             while (running) {
-                MessageConsumer consumer = null;
-                Session session = null;
-                try {
+                try (ResourceCloser closer = new ResourceCloser()) {
+                    closer.register(createInitialContext());
                     final Transaction externalTransaction = 
transactionManager.getTransaction();
                     if ((externalTransaction != null) && 
(externalTransaction.getStatus() == Status.STATUS_ACTIVE)) {
                         LOG.log(Level.SEVERE, "External transactions are not 
supported in XAPoller");
@@ -109,8 +104,8 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
                      * Create session inside transaction to give it the 
                      * chance to enlist itself as a resource
                      */
-                    session = connection.createSession(transacted, 
acknowledgeMode);
-                    consumer = createConsumer(session);
+                    Session session = 
closer.register(connection.createSession(transacted, acknowledgeMode));
+                    MessageConsumer consumer = 
closer.register(createConsumer(session));
                     Message message = consumer.receive(1000);
                     try {
                         if (message != null) {
@@ -120,9 +115,6 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
                     } catch (Throwable e) {
                         LOG.log(Level.WARNING, "Exception while processing jms 
message in cxf. Rolling back", e);
                         safeRollBack(session);
-                    } finally {
-                        ResourceCloser.close(consumer);
-                        ResourceCloser.close(session);
                     }
                 } catch (Exception e) {
                     LOG.log(Level.WARNING, "Unexpected exception. Restarting 
session and consumer", e);

http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index 7abbfe2..b34f53d 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -221,7 +221,7 @@ public abstract class AbstractJMSTester extends Assert {
 
     protected void waitForReceiveDestMessage() {
         int waitTime = 0;
-        while (destMessage == null && waitTime < MAX_RECEIVE_TIME) {
+        while (destMessage == null && waitTime < MAX_RECEIVE_TIME * 10) {
             try {
                 Thread.sleep(100);
             } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/e1c60863/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index db7e241..690c721 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -241,6 +241,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
         waitForReceiveInMessage();
         verifyReceivedMessage(inMessage);
 
+        // wait for a while for the jms session recycling
         Thread.sleep(1000);
         conduit.close();
         destination.shutdown();

Reply via email to