Author: hashutosh
Date: Tue Jun 14 21:16:47 2011
New Revision: 1135798

URL: http://svn.apache.org/viewvc?rev=1135798&view=rev
Log:
HCATALOG-39: Lazily create connection for Message bus

Added:
    
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/ivy.xml
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Jun 14 21:16:47 2011
@@ -12,6 +12,9 @@ Trunk (unreleased changes)
     (Krishna Kumar via macyang)
     
   IMPROVEMENTS
+
+    HCAT-39. Lazily create connection for Message bus (hashutosh)
+
     HCAT-44. Add a releaseaudit target to build.xml (gates)
 
     HCAT-40. Remove dependencies from the HCatalog client jar (macyang)

Modified: incubator/hcatalog/trunk/ivy.xml
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/ivy.xml?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- incubator/hcatalog/trunk/ivy.xml (original)
+++ incubator/hcatalog/trunk/ivy.xml Tue Jun 14 21:16:47 2011
@@ -45,7 +45,7 @@
           conf="common->master"/>
           -->
         <dependency org="javax.jms" name="jms" rev="${jms.version}" 
conf="common->master" />
-        <dependency org="org.apache.activemq" name="activemq-core" 
rev="${activemq.version}" conf="common->master" />
+        <dependency org="org.apache.activemq" name="activemq-all" 
rev="${activemq.version}" conf="common->master" />
         <dependency org="javax.management.j2ee" name="management-api" 
rev="${javax-mgmt.version}" conf="common->master" /> 
         <dependency org="com.google.code.p.arat" name="rat-lib" 
rev="${rats-lib.version}" conf="releaseaudit->default"/>
 </dependencies>

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
 (original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
 Tue Jun 14 21:16:47 2011
@@ -22,11 +22,11 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 
-import javax.jdo.PersistenceManager;
-import javax.jdo.Query;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
@@ -35,16 +35,11 @@ import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
-import org.apache.hadoop.hive.metastore.ObjectStore;
-import org.apache.hadoop.hive.metastore.RawStore;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command;
-import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -82,26 +77,7 @@ public class NotificationListener extend
        public NotificationListener(final Configuration conf) {
 
                super(conf);
-               try {
-                       Context jndiCntxt = new InitialContext();
-                       ConnectionFactory connFac = 
(ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
-                       conn = connFac.createConnection();
-                       conn.start();
-                       // We want message to be sent when session commits, 
thus we run in
-                       // transacted mode.
-                       session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
-
-               } catch (NamingException e) {
-                       LOG.error("JNDI error while setting up Message Bus 
connection. " +
-                                       "Please make sure file named 
'jndi.properties' is in " +
-                                       "classpath and contains appropriate 
key-value pairs.",e);
-               }
-               catch (JMSException e) {
-                       LOG.error("Failed to initialize connection to message 
bus",e);
-               }
-               catch(Throwable t){
-                       LOG.error("HCAT Listener failed to load",t);
-               }
+               createConnection();
        }
 
        @Override
@@ -227,18 +203,36 @@ public class NotificationListener extend
         */
        private void send(Serializable msgBody, String topicName, String event){
 
-               if(null == session){
-                       // If we weren't able to setup the session in the 
constructor
-                       // we cant send message in any case.
-                       LOG.error("Invalid session. Failed to send message on 
topic: "+
-                                       topicName + " event: "+event);
-                       return;
-               }
-
                try{
-                       // Topics are created on demand. If it doesn't exist on 
broker it will
-                       // be created when broker receives this message.
-                       Destination topic = session.createTopic(topicName);
+
+                       Destination topic = null;
+                       if(null == session){
+                               // this will happen, if we never able to 
establish a connection.
+                               createConnection();
+                               if (null == session){
+                                       // Still not successful, return from 
here.
+                                       LOG.error("Invalid session. Failed to 
send message on topic: "+
+                                                       topicName + " event: 
"+event);                          
+                                       return;
+                               }
+                       }
+                       try{
+                               // Topics are created on demand. If it doesn't 
exist on broker it will
+                               // be created when broker receives this message.
+                               topic = session.createTopic(topicName);         
                
+                       } catch (IllegalStateException ise){
+                               // this will happen if we were able to 
establish connection once, but its no longer valid,
+                               // ise is thrown, catch it and retry.
+                               LOG.error("Seems like connection is lost. 
Retrying", ise);
+                               createConnection();
+                               topic = session.createTopic(topicName);         
                
+                       }
+                       if (null == topic){
+                               // Still not successful, return from here.
+                               LOG.error("Invalid session. Failed to send 
message on topic: "+
+                                               topicName + " event: "+event);  
                        
+                               return;
+                       }
                        MessageProducer producer = 
session.createProducer(topic);
                        ObjectMessage msg = 
session.createObjectMessage(msgBody);
                        msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
@@ -252,13 +246,44 @@ public class NotificationListener extend
                }
        }
 
+       private void createConnection(){
+
+               Context jndiCntxt;
+               try {
+                       jndiCntxt = new InitialContext();
+                       ConnectionFactory connFac = 
(ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
+                       Connection conn = connFac.createConnection();
+                       conn.start();
+                       conn.setExceptionListener(new ExceptionListener() {
+                               @Override
+                               public void onException(JMSException jmse) {
+                                               LOG.error(jmse);
+                               }
+                       });
+                       // We want message to be sent when session commits, 
thus we run in
+                       // transacted mode.
+                       session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+               } catch (NamingException e) {
+                       LOG.error("JNDI error while setting up Message Bus 
connection. " +
+                                       "Please make sure file named 
'jndi.properties' is in " +
+                                       "classpath and contains appropriate 
key-value pairs.",e);
+               } catch (JMSException e) {
+                       LOG.error("Failed to initialize connection to message 
bus",e);
+               } catch(Throwable t){
+                       LOG.error("Unable to connect to JMS provider",t);
+               }
+       }
+
        @Override
        protected void finalize() throws Throwable {
                // Close the connection before dying.
                try {
+                       if (null != session)
+                               session.close();
                        if(conn != null) {
                                conn.close();
                        }
+                       
                } catch (Exception ignore) {
                        LOG.info("Failed to close message bus connection.", 
ignore);
                }

Added: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1135798&view=auto
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
 (added)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
 Tue Jun 14 21:16:47 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+
+public class TestMsgBusConnection extends TestCase{
+
+       private Driver driver;
+       private BrokerService broker;
+       private MessageConsumer consumer;
+
+       @Override
+       protected void setUp() throws Exception {
+
+               super.setUp();
+               broker = new BrokerService();
+               // configure the broker
+               
broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+
+               broker.start();
+
+               System.setProperty("java.naming.factory.initial", 
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+               System.setProperty("java.naming.provider.url", 
"tcp://localhost:61616");
+               connectClient();
+               HiveConf hiveConf = new HiveConf(this.getClass());
+               
hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
+               hiveConf.set("hive.metastore.local", "true");
+               hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+               hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+               
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+               SessionState.start(new CliSessionState(hiveConf));
+               driver = new Driver(hiveConf);
+       }
+
+       private void connectClient() throws JMSException{
+               ConnectionFactory connFac = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+               Connection conn = connFac.createConnection();
+               conn.start();
+               Session session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+               Destination hcatTopic = 
session.createTopic(HCatConstants.HCAT_TOPIC);
+               consumer = session.createConsumer(hcatTopic);
+       }
+
+       public void testConnection() throws Exception{
+
+               try{
+                       driver.run("create database testconndb");
+                       Message msg = consumer.receive();
+                       assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
+                       
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                       assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
+                       broker.stop();
+                       driver.run("drop database testconndb cascade");
+                       broker.start(true);
+                       connectClient();
+                       driver.run("create database testconndb");
+                       msg = consumer.receive();
+                       assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
+                       
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                       assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
+                       driver.run("drop database testconndb cascade");
+                       msg = consumer.receive();
+                       assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
+                       
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                       assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
+               } catch (NoSuchObjectException nsoe){
+                       nsoe.printStackTrace(System.err);
+                       assert false;
+               } catch (AlreadyExistsException aee){
+                       aee.printStackTrace(System.err);
+                       assert false;
+               }
+       }
+}


Reply via email to