Author: hashutosh
Date: Sat Jun 18 02:01:46 2011
New Revision: 1137114

URL: http://svn.apache.org/viewvc?rev=1137114&view=rev
Log:
HCATALOG-47: Topic prefix for the message bus should be configurable

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
    
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
    
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Jun 18 02:01:46 2011
@@ -13,6 +13,8 @@ Trunk (unreleased changes)
     
   IMPROVEMENTS
 
+    HCAT-47. Topic prefix for the message bus should be configurable 
(hashutosh)
+
     HCAT-39. Lazily create connection for Message bus (hashutosh)
 
     HCAT-44. Add a releaseaudit target to build.xml (gates)

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java 
(original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java 
Sat Jun 18 02:01:46 2011
@@ -70,9 +70,10 @@ public final class HCatConstants {
   
   public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
   public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = 
"hcat.msgbus.topic.naming.policy";
+  public static final String HCAT_MSGBUS_TOPIC_PREFIX = 
"hcat.msgbus.topic.prefix";
 
   // Message Bus related properties.
-  public static final String HCAT_TOPIC = "HCAT";
+  public static final String HCAT_DEFAULT_TOPIC_PREFIX = "HCAT";
   public static final String HCAT_EVENT = "HCAT_EVENT";
   public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
   public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
 (original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
 Sat Jun 18 02:01:46 2011
@@ -38,6 +38,7 @@ import javax.naming.NamingException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -133,7 +134,7 @@ public class NotificationListener extend
                // by listening on a topic named "HCAT" and message selector 
string
                // as "HCAT_EVENT = HCAT_ADD_DATABASE" 
                if(dbEvent.getStatus())
-                       
send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT);
+                       
send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_ADD_DATABASE_EVENT);
        }
 
        @Override
@@ -142,7 +143,7 @@ public class NotificationListener extend
                // by listening on a topic named "HCAT" and message selector 
string
                // as "HCAT_EVENT = HCAT_DROP_DATABASE" 
                if(dbEvent.getStatus())
-                       
send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT);
+                       
send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_DROP_DATABASE_EVENT);
        }
 
        @Override
@@ -155,23 +156,23 @@ public class NotificationListener extend
                                Table tbl = tableEvent.getTable();
                                Table newTbl = tbl.deepCopy();
                                HMSHandler handler = tableEvent.getHandler();
-                               String namingPolicy = 
handler.getHiveConf().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAMING_POLICY, 
"tablename");
-                               
newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, 
getTopicNameForParts(namingPolicy, tbl.getDbName(), tbl.getTableName()));
+                               HiveConf conf = handler.getHiveConf();
+                               
newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, 
+                                               getTopicPrefix(conf) + "." + 
tbl.getDbName() +"." + tbl.getTableName());
                                try {
                                        handler.alter_table(tbl.getDbName(), 
tbl.getTableName(), newTbl);
                                } catch (InvalidOperationException e) {
                                        throw new MetaException(e.toString());
                                }
-                               
send(tableEvent.getTable(),HCatConstants.HCAT_TOPIC+"."+tbl.getDbName(), 
HCatConstants.HCAT_ADD_TABLE_EVENT);
+                               
send(tableEvent.getTable(),getTopicPrefix(conf)+ "."+ tbl.getDbName(), 
HCatConstants.HCAT_ADD_TABLE_EVENT);
                        }
                }       
        }
 
-       private String getTopicNameForParts(String namingPolicy, String dbName, 
String tblName){
-               // we only have one policy now, so ignore policy param for now.
-               return HCatConstants.HCAT_TOPIC+"."+dbName+"."+tblName;
+       private String getTopicPrefix(HiveConf conf){
+               return 
conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
        }
-
+       
        @Override
        public void onDropTable(DropTableEvent tableEvent) throws MetaException 
{
                // Subscriber can get notification about drop of a  table in 
HCAT
@@ -188,7 +189,7 @@ public class NotificationListener extend
                        sd.setSortCols(new ArrayList<Order>());
                        sd.setParameters(new HashMap<String, String>());
                        sd.getSerdeInfo().setParameters(new HashMap<String, 
String>());
-                       
send(table,HCatConstants.HCAT_TOPIC+"."+table.getDbName(), 
HCatConstants.HCAT_DROP_TABLE_EVENT);        
+                       
send(table,getTopicPrefix(tableEvent.getHandler().getHiveConf())+"."+table.getDbName(),
 HCatConstants.HCAT_DROP_TABLE_EVENT);   
                }
        }
 
@@ -246,7 +247,7 @@ public class NotificationListener extend
                }
        }
 
-       private void createConnection(){
+       protected void createConnection(){
 
                Context jndiCntxt;
                try {

Modified: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
 (original)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
 Sat Jun 18 02:01:46 2011
@@ -66,6 +66,7 @@ public class TestMsgBusConnection extend
                hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
                hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
                
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+               hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, 
"planetlab.hcat");
                SessionState.start(new CliSessionState(hiveConf));
                driver = new Driver(hiveConf);
        }
@@ -75,7 +76,7 @@ public class TestMsgBusConnection extend
                Connection conn = connFac.createConnection();
                conn.start();
                Session session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
-               Destination hcatTopic = 
session.createTopic(HCatConstants.HCAT_TOPIC);
+               Destination hcatTopic = session.createTopic("planetlab.hcat");
                consumer = session.createConsumer(hcatTopic);
        }
 
@@ -85,7 +86,7 @@ public class TestMsgBusConnection extend
                        driver.run("create database testconndb");
                        Message msg = consumer.receive();
                        assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-                       
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                       
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
                        assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
                        broker.stop();
                        driver.run("drop database testconndb cascade");
@@ -94,12 +95,12 @@ public class TestMsgBusConnection extend
                        driver.run("create database testconndb");
                        msg = consumer.receive();
                        assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-                       
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                       
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
                        assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
                        driver.run("drop database testconndb cascade");
                        msg = consumer.receive();
                        assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-                       
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                       
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
                        assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
                } catch (NoSuchObjectException nsoe){
                        nsoe.printStackTrace(System.err);

Modified: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
 (original)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
 Sat Jun 18 02:01:46 2011
@@ -68,13 +68,13 @@ public class TestNotificationListener ex
                // We want message to be sent when session commits, thus we run 
in
                // transacted mode.
                Session session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
-               Destination hcatTopic = 
session.createTopic(HCatConstants.HCAT_TOPIC);
+               Destination hcatTopic = 
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
                MessageConsumer consumer1 = session.createConsumer(hcatTopic);
                consumer1.setMessageListener(this);
-               Destination tblTopic = 
session.createTopic(HCatConstants.HCAT_TOPIC+".mydb.mytbl");
+               Destination tblTopic = 
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl");
                MessageConsumer consumer2 = session.createConsumer(tblTopic);
                consumer2.setMessageListener(this);
-               Destination dbTopic = 
session.createTopic(HCatConstants.HCAT_TOPIC+".mydb");
+               Destination dbTopic = 
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb");
                MessageConsumer consumer3 = session.createConsumer(dbTopic);
                consumer3.setMessageListener(this);
                hiveConf = new HiveConf(this.getClass());
@@ -112,7 +112,7 @@ public class TestNotificationListener ex
                        event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
                        if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){
 
-                               
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                               
assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
                                assertEquals("mydb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
                        }
                        else 
if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){
@@ -153,7 +153,7 @@ public class TestNotificationListener ex
                        }
                        else 
if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){
 
-                               
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                               
assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
                                assertEquals("mydb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
                        }
                        else


Reply via email to