Author: hashutosh
Date: Sat Jun 18 02:01:46 2011
New Revision: 1137114
URL: http://svn.apache.org/viewvc?rev=1137114&view=rev
Log:
HCATALOG-47: Topic prefix for the message bus should be configurable
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Jun 18 02:01:46 2011
@@ -13,6 +13,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HCAT-47. Topic prefix for the message bus should be configurable
(hashutosh)
+
HCAT-39. Lazily create connection for Message bus (hashutosh)
HCAT-44. Add a releaseaudit target to build.xml (gates)
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
Sat Jun 18 02:01:46 2011
@@ -70,9 +70,10 @@ public final class HCatConstants {
public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY =
"hcat.msgbus.topic.naming.policy";
+ public static final String HCAT_MSGBUS_TOPIC_PREFIX =
"hcat.msgbus.topic.prefix";
// Message Bus related properties.
- public static final String HCAT_TOPIC = "HCAT";
+ public static final String HCAT_DEFAULT_TOPIC_PREFIX = "HCAT";
public static final String HCAT_EVENT = "HCAT_EVENT";
public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
Sat Jun 18 02:01:46 2011
@@ -38,6 +38,7 @@ import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -133,7 +134,7 @@ public class NotificationListener extend
// by listening on a topic named "HCAT" and message selector
string
// as "HCAT_EVENT = HCAT_ADD_DATABASE"
if(dbEvent.getStatus())
-
send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT);
+
send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_ADD_DATABASE_EVENT);
}
@Override
@@ -142,7 +143,7 @@ public class NotificationListener extend
// by listening on a topic named "HCAT" and message selector
string
// as "HCAT_EVENT = HCAT_DROP_DATABASE"
if(dbEvent.getStatus())
-
send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT);
+
send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_DROP_DATABASE_EVENT);
}
@Override
@@ -155,23 +156,23 @@ public class NotificationListener extend
Table tbl = tableEvent.getTable();
Table newTbl = tbl.deepCopy();
HMSHandler handler = tableEvent.getHandler();
- String namingPolicy =
handler.getHiveConf().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAMING_POLICY,
"tablename");
-
newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
getTopicNameForParts(namingPolicy, tbl.getDbName(), tbl.getTableName()));
+ HiveConf conf = handler.getHiveConf();
+
newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
+ getTopicPrefix(conf) + "." +
tbl.getDbName() +"." + tbl.getTableName());
try {
handler.alter_table(tbl.getDbName(),
tbl.getTableName(), newTbl);
} catch (InvalidOperationException e) {
throw new MetaException(e.toString());
}
-
send(tableEvent.getTable(),HCatConstants.HCAT_TOPIC+"."+tbl.getDbName(),
HCatConstants.HCAT_ADD_TABLE_EVENT);
+
send(tableEvent.getTable(),getTopicPrefix(conf)+ "."+ tbl.getDbName(),
HCatConstants.HCAT_ADD_TABLE_EVENT);
}
}
}
- private String getTopicNameForParts(String namingPolicy, String dbName,
String tblName){
- // we only have one policy now, so ignore policy param for now.
- return HCatConstants.HCAT_TOPIC+"."+dbName+"."+tblName;
+ private String getTopicPrefix(HiveConf conf){
+ return
conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
}
-
+
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException
{
// Subscriber can get notification about drop of a table in
HCAT
@@ -188,7 +189,7 @@ public class NotificationListener extend
sd.setSortCols(new ArrayList<Order>());
sd.setParameters(new HashMap<String, String>());
sd.getSerdeInfo().setParameters(new HashMap<String,
String>());
-
send(table,HCatConstants.HCAT_TOPIC+"."+table.getDbName(),
HCatConstants.HCAT_DROP_TABLE_EVENT);
+
send(table,getTopicPrefix(tableEvent.getHandler().getHiveConf())+"."+table.getDbName(),
HCatConstants.HCAT_DROP_TABLE_EVENT);
}
}
@@ -246,7 +247,7 @@ public class NotificationListener extend
}
}
- private void createConnection(){
+ protected void createConnection(){
Context jndiCntxt;
try {
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
Sat Jun 18 02:01:46 2011
@@ -66,6 +66,7 @@ public class TestMsgBusConnection extend
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
"planetlab.hcat");
SessionState.start(new CliSessionState(hiveConf));
driver = new Driver(hiveConf);
}
@@ -75,7 +76,7 @@ public class TestMsgBusConnection extend
Connection conn = connFac.createConnection();
conn.start();
Session session = conn.createSession(true,
Session.SESSION_TRANSACTED);
- Destination hcatTopic =
session.createTopic(HCatConstants.HCAT_TOPIC);
+ Destination hcatTopic = session.createTopic("planetlab.hcat");
consumer = session.createConsumer(hcatTopic);
}
@@ -85,7 +86,7 @@ public class TestMsgBusConnection extend
driver.run("create database testconndb");
Message msg = consumer.receive();
assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
assertEquals("testconndb", ((Database)
((ObjectMessage)msg).getObject()).getName());
broker.stop();
driver.run("drop database testconndb cascade");
@@ -94,12 +95,12 @@ public class TestMsgBusConnection extend
driver.run("create database testconndb");
msg = consumer.receive();
assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
assertEquals("testconndb", ((Database)
((ObjectMessage)msg).getObject()).getName());
driver.run("drop database testconndb cascade");
msg = consumer.receive();
assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
assertEquals("testconndb", ((Database)
((ObjectMessage)msg).getObject()).getName());
} catch (NoSuchObjectException nsoe){
nsoe.printStackTrace(System.err);
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1137114&r1=1137113&r2=1137114&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
Sat Jun 18 02:01:46 2011
@@ -68,13 +68,13 @@ public class TestNotificationListener ex
// We want message to be sent when session commits, thus we run
in
// transacted mode.
Session session = conn.createSession(true,
Session.SESSION_TRANSACTED);
- Destination hcatTopic =
session.createTopic(HCatConstants.HCAT_TOPIC);
+ Destination hcatTopic =
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
MessageConsumer consumer1 = session.createConsumer(hcatTopic);
consumer1.setMessageListener(this);
- Destination tblTopic =
session.createTopic(HCatConstants.HCAT_TOPIC+".mydb.mytbl");
+ Destination tblTopic =
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl");
MessageConsumer consumer2 = session.createConsumer(tblTopic);
consumer2.setMessageListener(this);
- Destination dbTopic =
session.createTopic(HCatConstants.HCAT_TOPIC+".mydb");
+ Destination dbTopic =
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb");
MessageConsumer consumer3 = session.createConsumer(dbTopic);
consumer3.setMessageListener(this);
hiveConf = new HiveConf(this.getClass());
@@ -112,7 +112,7 @@ public class TestNotificationListener ex
event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){
-
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+
assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
assertEquals("mydb", ((Database)
((ObjectMessage)msg).getObject()).getName());
}
else
if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){
@@ -153,7 +153,7 @@ public class TestNotificationListener ex
}
else
if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){
-
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+
assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
assertEquals("mydb", ((Database)
((ObjectMessage)msg).getObject()).getName());
}
else