Author: virag
Date: Sat Jan 19 00:53:33 2013
New Revision: 1435431

URL: http://svn.apache.org/viewvc?rev=1435431&view=rev
Log:
OOZIE-1180 Separate the connection context details from JMS Accessor service 
(virag)

Added:
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
Modified:
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
    oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XTestCase.java
    oozie/branches/hcat-intre/release-log.txt

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java?rev=1435431&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
 Sat Jan 19 00:53:33 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+/**
+ *  Maintains a JMS connection for creating session, consumer and producer
+ */
+public interface ConnectionContext {
+
+    /**
+     * Create connection factory using properties
+     * @param props the properties used for creating jndi context
+     * @return
+     * @throws NamingException
+     */
+    public ConnectionFactory createConnectionFactory(Properties props) throws 
NamingException;
+
+    /**
+     * Create connection using connection Factory
+     * @param connFactory
+     * @throws JMSException
+     */
+    public void createConnection(ConnectionFactory connFactory) throws 
JMSException;
+
+    /**
+    * Set the exception Listener
+    * @param exceptionListener
+    */
+    public void setExceptionListener(ExceptionListener exceptionListener) 
throws JMSException;
+
+    /**
+     * Checks whether connection is initialized or not
+     * @return
+     */
+    public boolean isConnectionInitialized();
+
+    /**
+     * Creates session using the specified session opts
+     * @param sessionOpts
+     * @return
+     * @throws JMSException
+     */
+    public Session createSession(int sessionOpts) throws JMSException;
+
+    /**
+     * Creates consumer using session and topic name
+     * @param session
+     * @param topicName
+     * @return
+     * @throws JMSException
+     */
+    public MessageConsumer createConsumer(Session session, String topicName) 
throws JMSException;
+
+    /**
+     * Creates producer using session and topic
+     * @param session
+     * @param topicName
+     * @return
+     * @throws JMSException
+     */
+    public MessageProducer createProducer(Session session, String topicName) 
throws JMSException;
+
+    /**
+     * Retrieves the connection for this connection context
+     * @return
+     */
+    public Connection getConnection();
+
+    /**
+     * Retrieves the conneciton factory name for this context
+     * @return
+     */
+    public String getConnectionFactoryName();
+
+    /**
+     * Closes the connection
+     */
+    public void close();
+
+}

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java?rev=1435431&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
 Sat Jan 19 00:53:33 2013
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.oozie.util.XLog;
+
+public class DefaultConnectionContext implements ConnectionContext {
+
+    protected Connection connection;
+    protected String connectionFactoryName;
+    private static XLog LOG = XLog.getLog(ConnectionContext.class);
+
+    @Override
+    public ConnectionFactory createConnectionFactory(Properties props) throws 
NamingException {
+        Context jndiContext = new InitialContext(props);
+        connectionFactoryName = (String) 
jndiContext.getEnvironment().get("connectionFactoryNames");
+        if (connectionFactoryName == null || 
connectionFactoryName.trim().length() == 0) {
+            connectionFactoryName = "ConnectionFactory";
+        }
+        ConnectionFactory connectionFactory = (ConnectionFactory) 
jndiContext.lookup(connectionFactoryName);
+        LOG.info("Connecting with the following properties \n" + 
jndiContext.getEnvironment().toString());
+        return connectionFactory;
+
+    }
+
+    @Override
+    public void createConnection(ConnectionFactory connFactory) throws 
JMSException {
+        try {
+            connection = connFactory.createConnection();
+            connection.start();
+            connection.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException je) {
+                    LOG.error(je);
+                }
+            });
+
+        }
+        catch (JMSException e1) {
+            LOG.error(e1.getMessage(), e1);
+            if (connection != null) {
+                try {
+                    connection.close();
+                }
+                catch (Exception e2) {
+                    LOG.error(e2.getMessage(), e2);
+                }
+            }
+            throw e1;
+        }
+    }
+
+    @Override
+    public boolean isConnectionInitialized() {
+        return (connection != null) ? true : false;
+    }
+
+    @Override
+    public void setExceptionListener(ExceptionListener exceptionListener) 
throws JMSException {
+        connection.setExceptionListener(exceptionListener);
+    }
+
+    @Override
+    public Session createSession(int sessionOpts) throws JMSException {
+        return connection.createSession(false, sessionOpts);
+    }
+
+    @Override
+    public MessageConsumer createConsumer(Session session, String topicName) 
throws JMSException {
+        Topic topic = session.createTopic(topicName);
+        MessageConsumer consumer = session.createConsumer(topic);
+        return consumer;
+    }
+
+    @Override
+    public MessageProducer createProducer(Session session, String topicName) 
throws JMSException {
+        Topic topic = session.createTopic(topicName);
+        MessageProducer producer = session.createProducer(topic);
+        return producer;
+    }
+
+    @Override
+    public Connection getConnection() {
+        return connection;
+    }
+
+    @Override
+    public String getConnectionFactoryName() {
+        return connectionFactoryName;
+    }
+
+    @Override
+    public void close() {
+        try {
+            connection.close();
+        }
+        catch (JMSException e) {
+            LOG.warn("Unable to close the connection " + connection, e);
+        }
+    }
+
+}

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1435431&r1=1435430&r2=1435431&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
 Sat Jan 19 00:53:33 2013
@@ -25,20 +25,16 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.ErrorCode;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.jms.ConnectionContext;
+import org.apache.oozie.jms.DefaultConnectionContext;
 import org.apache.oozie.jms.MessageHandler;
 import org.apache.oozie.jms.MessageReceiver;
 import org.apache.oozie.util.MappingRule;
@@ -57,8 +53,8 @@ import com.google.common.annotations.Vis
  */
 public class JMSAccessorService implements Service {
     public static final String CONF_PREFIX = Service.CONF_PREFIX + 
"JMSAccessorService.";
-    public static final String JMS_CONNECTION_FACTORY = CONF_PREFIX + 
"jms.connectionFactory";
     public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + 
"connections";
+    public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX 
+"connectioncontext.impl";
     public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
     public static final String DEFAULT_SERVER_ENDPOINT = "default";
     private static final String DELIMITER = "#";
@@ -66,6 +62,7 @@ public class JMSAccessorService implemen
 
     private String defaultConnection = null;
     private Configuration conf;
+    private int sessionOpts;
     private List<MappingRule> mappingRules = null;
     private ConcurrentHashMap<String, ConnectionContext> connectionMap = new 
ConcurrentHashMap<String, ConnectionContext>();
     private ConcurrentHashMap<String, MessageReceiver> receiversMap = new 
ConcurrentHashMap<String, MessageReceiver>();
@@ -76,6 +73,7 @@ public class JMSAccessorService implemen
         LOG = XLog.getLog(getClass());
         conf = services.getConf();
         initializeMappingRules();
+        sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
     }
 
 
@@ -128,22 +126,39 @@ public class JMSAccessorService implemen
             }
             synchronized (receiversMap) {
                 if (!receiversMap.containsKey(key)) {
-                    ConnectionContext connCtxt = createConnection(server);
-                    Session session = connCtxt.createSession();
-                    MessageConsumer consumer = 
connCtxt.createConsumer(session, topic);
-                    MessageReceiver receiver = new MessageReceiver(msgHandler, 
session, consumer);
-                    consumer.setMessageListener(receiver);
+                    ConnectionContext connCtxt = getConnectionContext(server);
+                    if (!connCtxt.isConnectionInitialized()){
+                        Properties props = getJMSServerProps(server);
+                        if (props == null) {
+                            LOG.warn("Connection not established to JMS server 
for server [{0}] as JMS connection properties are not correctly defined",
+                                    server);
+                            return;
+                        }
+                        ConnectionFactory connFactory = 
connCtxt.createConnectionFactory(props);
+                        connCtxt.createConnection(connFactory);
+                    }
+                    MessageReceiver receiver = setupJMSReceiver(connCtxt, 
topic, msgHandler);
                     LOG.info("Registered a listener for topic {0} from 
publisher {1}", topic, server);
                     receiversMap.put(key, receiver);
                 }
             }
         }
         catch (Exception e) {
-            //TODO: Exponentially backed off retry in case of connection 
failure.
+            // TODO: Exponentially backed off retry in case of connection
+            // failure.
             LOG.warn("Connection to JMS server failed for publisher {0}", 
publisherURI, e);
         }
     }
 
+    private MessageReceiver setupJMSReceiver(ConnectionContext connCtxt, 
String topic, MessageHandler msgHandler)
+            throws NamingException, JMSException {
+        Session session = connCtxt.createSession(sessionOpts);
+        MessageConsumer consumer = connCtxt.createConsumer(session, topic);
+        MessageReceiver receiver = new MessageReceiver(msgHandler, session, 
consumer);
+        consumer.setMessageListener(receiver);
+        return receiver;
+    }
+
     /**
      * Unregister from listening to JMS messages on a topic.
      *
@@ -190,16 +205,11 @@ public class JMSAccessorService implemen
         return receiversMap.get(publisherAuthority + DELIMITER + topic) != 
null;
     }
 
-    protected ConnectionContext createConnection(String publisherAuthority) 
throws ServiceException {
+    protected ConnectionContext getConnectionContext(String 
publisherAuthority) {
         ConnectionContext connCtxt = connectionMap.get(publisherAuthority);
         if (connCtxt == null) {
-            Properties props = getJMSServerProps(publisherAuthority);
-            if (props != null) {
-                Connection conn = getConnection(props);
-                LOG.info("Connection established to JMS Server for publisher " 
+ publisherAuthority);
-                connCtxt = new ConnectionContext(conn);
-                connectionMap.put(publisherAuthority, connCtxt);
-            }
+            connCtxt = getConnectionContextImpl();
+            connectionMap.put(publisherAuthority, connCtxt);
         }
         return connCtxt;
     }
@@ -245,6 +255,7 @@ public class JMSAccessorService implemen
             }
             else {
                 LOG.info("Unformatted properties. Expected key#value : " + 
pair);
+                return null;
             }
         }
         if (props.isEmpty()) {
@@ -274,12 +285,7 @@ public class JMSAccessorService implemen
 
         LOG.info("Closing JMS connections");
         for (Entry<String, ConnectionContext> entry : 
connectionMap.entrySet()) {
-            try {
-                entry.getValue().getConnection().close();
-            }
-            catch (JMSException e) {
-                LOG.warn("Unable to close the connection for " + 
entry.getKey(), e);
-            }
+                entry.getValue().close();
         }
         connectionMap.clear();
     }
@@ -289,129 +295,17 @@ public class JMSAccessorService implemen
         return JMSAccessorService.class;
     }
 
-    /*
-     * Look up connection factory Create connection
-     */
-    protected synchronized Connection getConnection(Properties props) throws 
ServiceException {
-
-        Connection conn = null;
-        try {
-            Context jndiContext = getJndiContext(props);
-            String connFacName = (String) 
jndiContext.getEnvironment().get(JMS_CONNECTION_FACTORY);
-            if (connFacName == null || connFacName.trim().length() == 0) {
-                connFacName = "ConnectionFactory";
-            }
-            ConnectionFactory connFac = (ConnectionFactory) 
jndiContext.lookup(connFacName);
-            LOG.info("Connecting with the following properties \n" + 
jndiContext.getEnvironment().toString());
-            conn = connFac.createConnection();
-            conn.start();
-            conn.setExceptionListener(new ExceptionListener() {
-                @Override
-                public void onException(JMSException je) {
-                    LOG.error(je);
-                }
-            });
-
-        }
-        catch (Exception e1) {
-            LOG.error(e1.getMessage(), e1);
-            if (conn != null) {
-                try {
-                    conn.close();
-                }
-                catch (Exception e2) {
-                    LOG.error(e2.getMessage(), e2);
-                }
-            }
-            throw new ServiceException(ErrorCode.E0100, getClass().getName(), 
e1.getMessage(), e1);
-        }
-        return conn;
-    }
-
-    /*
-     * Create a JNDI API InitialContext object
-     */
-    private Context getJndiContext(Properties props) throws ServiceException {
-        Context ctx;
-        try {
-            ctx = new InitialContext(props);
-        }
-        catch (NamingException e) {
-            LOG.warn("Unable to initialize the context :", e);
-            throw new ServiceException(ErrorCode.E0100, getClass().getName(), 
e.getMessage(), e);
-        }
-        return ctx;
-    }
-
-    /**
-     * This class maintains a JMS connection and map of topic to Session. Only
-     * one session per topic.
-     */
-    public class ConnectionContext {
-        private Connection connection;
 
-        public ConnectionContext(Connection conn) {
-            this.connection = conn;
+    private ConnectionContext getConnectionContextImpl() {
+        Class<?> defaultClazz = conf.getClass(JMS_CONNECTION_CONTEXT_IMPL, 
DefaultConnectionContext.class);
+        ConnectionContext connCtx = null;
+        if (defaultClazz == DefaultConnectionContext.class) {
+            connCtx = new DefaultConnectionContext();
         }
-
-        /**
-         * If there is no existing session for a specific topic name, this
-         * method creates a new session. Otherwise, return the existing session
-         *
-         * @param topic : Name of the topic
-         * @return a new/exiting JMS session
-         * @throws JMSException
-         */
-        public Session createSession() throws JMSException {
-            int sessionOpts = conf.getInt(SESSION_OPTS, 
Session.AUTO_ACKNOWLEDGE);
-            return connection.createSession(false, sessionOpts);
-        }
-
-        /**
-         * Returns a new MessageConsumer object.
-         * It is the caller responsibility to close the MessageConsumer when 
done
-         *
-         * @param topicName : Name of the topic
-         * @return MessageConsumer
-         * @throws JMSException
-         */
-        public MessageConsumer createConsumer(Session session, String 
topicName) throws JMSException {
-            Topic topic = session.createTopic(topicName);
-            MessageConsumer consumer = session.createConsumer(topic);
-            return consumer;
-        }
-
-        /**
-         * Returns a new MessageProducer object.
-         * It is the caller responsibility to close the MessageProducer when 
done
-         *
-         * @param topicName : Name of the topic
-         * @return MessageProducer
-         * @throws JMSException
-         */
-        public MessageProducer createProducer(Session session, String 
topicName) throws JMSException {
-            Topic topic = session.createTopic(topicName);
-            MessageProducer producer = session.createProducer(topic);
-            return producer;
-        }
-
-        /**
-         * @return JMS connection
-         */
-        public Connection getConnection() {
-            return connection;
-        }
-
-        public void close() {
-
-            try {
-                connection.close();
-            }
-            catch (JMSException e) {
-                LOG.warn("Unable to close the connection " + connection, e);
-            }
+        else {
+            connCtx = (ConnectionContext) 
ReflectionUtils.newInstance(defaultClazz, null);
         }
-
+        return connCtx;
     }
 
 }

Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1435431&r1=1435430&r2=1435431&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml 
(original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Sat Jan 
19 00:53:33 2013
@@ -135,7 +135,7 @@
    <property>
         <name>oozie.service.JMSAccessorService.connections</name>
         <value>
-        
default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616,
+        
default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616;connectionFactoryNames#ConnectionFactory
         </value>
         <description>
         Specify the map  of endpoints to JMS configuration properties. In 
general, endpoint 
@@ -147,6 +147,17 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.JMSAccessorService.connectioncontext.impl</name>
+        <value>
+        org.apache.oozie.jms.DefaultConnectionContext
+        </value>
+        <description>
+        Specifies the Connection Context implementation
+        </description>
+    </property>
+
+
     <!-- ConfigurationService -->
 
     <property>

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1435431&r1=1435430&r2=1435431&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
 Sat Jan 19 00:53:33 2013
@@ -18,13 +18,17 @@
 package org.apache.oozie.service;
 
 import java.net.URI;
+import java.util.Properties;
+
+import javax.jms.ConnectionFactory;
 
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.jms.DefaultConnectionContext;
 import org.apache.oozie.jms.HCatMessageHandler;
 import org.apache.oozie.jms.MessageReceiver;
-import org.apache.oozie.service.JMSAccessorService.ConnectionContext;
+import org.apache.oozie.jms.ConnectionContext;
 import org.apache.oozie.test.XTestCase;
 import org.junit.Test;
 
@@ -54,11 +58,21 @@ public class TestJMSAccessorService exte
     public void testConnection() throws Exception {
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
         // both servers should connect to default JMS server
-        ConnectionContext ctxt1 = jmsService.createConnection("blahblah");
-        ConnectionContext ctxt2 = 
jmsService.createConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
+        Properties props = jmsService.getJMSServerProps("blahblah");
+        ConnectionContext ctxt1 = new DefaultConnectionContext();
+        ConnectionFactory connFactory = ctxt1.createConnectionFactory(props);
+        ctxt1.createConnection(connFactory);
+        assertTrue(ctxt1.isConnectionInitialized());
+        ctxt1.close();
+        props = 
jmsService.getJMSServerProps(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
+        ConnectionContext ctxt2 = new DefaultConnectionContext();
+        connFactory = ctxt2.createConnectionFactory(props);
+        ctxt2.createConnection(connFactory);
+        assertTrue(ctxt2.isConnectionInitialized());
+        ctxt2.close();
+
         assertNotNull(ctxt1);
         assertNotNull(ctxt2);
-        //assertEquals(ctxt1, ctxt2);
     }
 
     @Test
@@ -146,7 +160,7 @@ public class TestJMSAccessorService exte
 
 
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
-        String jmsServerMapping = 
jmsService.getJMSServerMapping("hcat://axoniteblue-1.blue.server.com:8020");
+        String jmsServerMapping = 
jmsService.getJMSServerMapping("hcat://hcatserver.blue.server.com:8020");
         // rules will be applied
         
assertEquals("java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.blue:61616",
 jmsServerMapping);
 
@@ -157,4 +171,31 @@ public class TestJMSAccessorService exte
         jmsServerMapping = 
jmsService.getJMSServerMapping("hcat://xyz.corp.dummy.com");
         
assertEquals("java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp:localhost:61616",
 jmsServerMapping);
     }
+
+    @Test
+    public void testConnectionContext() throws ServiceException {
+        try {
+            services.destroy();
+            services = super.setupServicesForHCatalog();
+            Configuration conf = services.getConf();
+            // set the connection factory name
+            String jmsURL = 
"hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
+            conf.set(JMSAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
+            services.init();
+            JMSAccessorService jmsService = 
services.get(JMSAccessorService.class);
+            String jmsServerMapping = 
jmsService.getJMSServerMapping("hcat://hcatserver.blue.server.com:8020");
+            assertEquals(
+                    
"java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
+                  jmsServerMapping);
+
+            ConnectionContext ctx1 = new DefaultConnectionContext();
+            Properties props = 
jmsService.getJMSServerProps("hcat://hcatserver.blue.server.com:8020");
+            ctx1.createConnectionFactory(props);
+            assertEquals("dynamicFactories/hcat.prod.hcatserver", 
ctx1.getConnectionFactoryName());
+        }
+        catch (Exception e) {
+            fail("Unexpected exception " + e);
+        }
+    }
+
 }

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XTestCase.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XTestCase.java?rev=1435431&r1=1435430&r2=1435431&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XTestCase.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XTestCase.java
 Sat Jan 19 00:53:33 2013
@@ -891,7 +891,8 @@ public abstract class XTestCase extends 
                 "org.apache.oozie.service.JMSAccessorService");
         conf.set(JMSAccessorService.JMS_CONNECTIONS_PROPERTIES,
                 "default=java.naming.factory.initial#" + ActiveMQConnFactory + 
";" +
-                "java.naming.provider.url#" + localActiveMQBroker);
+                "java.naming.provider.url#" + localActiveMQBroker +
+                "connectionFactoryNames#"+ "ConnectionFactory");
         conf.set(URIHandlerService.URI_HANDLERS,
                 FSURIHandler.class.getName() + "," + 
HCatURIHandler.class.getName());
         setSystemProperty("java.naming.factory.initial", 
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");

Modified: oozie/branches/hcat-intre/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1435431&r1=1435430&r2=1435431&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Sat Jan 19 00:53:33 2013
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1180 Separate the connection context details from JMS Accessor service 
(virag)
 OOZIE-1157 EL function hcat:exists for decision making (rohini via mona)
 OOZIE-1167 Fix and rework PartitionDependency Management (rohini via virag)
 OOZIE-1053 Oozie Web-console clicking on Bundle's coord jobs does not open 
them up (ryota via mona)


Reply via email to