Author: hashutosh
Date: Thu May 26 21:49:12 2011
New Revision: 1128099

URL: http://svn.apache.org/viewvc?rev=1128099&view=rev
Log:
HCATALOG-3: Send a message on a message bus when events occur in Metastore

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/ormodel.jdo
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/
    
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/build.xml
    incubator/hcatalog/trunk/ivy.xml
    incubator/hcatalog/trunk/ivy/ivysettings.xml
    incubator/hcatalog/trunk/ivy/libraries.properties
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1128099&r1=1128098&r2=1128099&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu May 26 21:49:12 2011
@@ -5,6 +5,9 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+ 
+    HCAT-3. Send a message on a message bus when events occur in Metastore 
(hashutosh)
+  
     HCAT-16. Add InputFormat/OutputFormat for handling exported 
tables/partitions.
     (Krishna Kumar via macyang)
     

Modified: incubator/hcatalog/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/build.xml?rev=1128099&r1=1128098&r2=1128099&view=diff
==============================================================================
--- incubator/hcatalog/trunk/build.xml (original)
+++ incubator/hcatalog/trunk/build.xml Thu May 26 21:49:12 2011
@@ -100,7 +100,10 @@
     <fileset dir="${hive.root}/build/hadoopcore/hadoop-0.20.3-CDH3-SNAPSHOT/"
       includes="hadoop-core-0.20.3-CDH3-SNAPSHOT.jar"/>
     <fileset dir="${ivy.lib.dir}" includes="*.jar"/>
-    
+    <fileset dir="${hive.root}/build/ivy/lib/default" 
includes="jdo2-api-2.3-ec.jar"/>
+    <fileset dir="${hive.root}/build/ivy/lib/default" 
includes="datanucleus-enhancer-2.0.3.jar"/>
+    <fileset dir="${hive.root}/build/ivy/lib/default" 
includes="datanucleus-core-2.0.3.jar"/>
+ <fileset dir="${hive.root}/lib" includes="asm-3.1.jar"/>
   </path>
 
   <path id="test.classpath">
@@ -217,6 +220,7 @@
       <compilerarg line="${javac.args}"/>
       <classpath refid="classpath" />
     </javac>
+  <antcall target="model-enhance" />
   </target>
  
   <!-- Build the jar, this is the default -->
@@ -230,6 +234,31 @@
     </jar>
   </target>
 
+ <!--
+  
================================================================================
+  Datanucleus Section
+  
================================================================================
+  -->
+
+ <target name="model-enhance">
+    <taskdef name="datanucleusenhancer"
+                classname="org.datanucleus.enhancer.tools.EnhancerTask">
+       <classpath refid="classpath"/>
+   </taskdef>
+
+    <datanucleusenhancer
+        dir="${basedir}" failonerror="true" verbose="true" fork="true">
+    <fileset dir="${src.dir}/org/apache/hcatalog/metadata/">
+       <include name="ormodel.jdo" />
+    </fileset>   
+    <classpath>
+          <path refid="classpath"/>
+          <pathelement path="${build.dir}/classes/"/>
+        </classpath>
+        <jvmarg 
line="-Dlog4j.configuration=${basedir}/../conf/hive-log4j.properties"/>
+    </datanucleusenhancer>
+  </target>
+  
   <!--
   
================================================================================
   Test Section

Modified: incubator/hcatalog/trunk/ivy.xml
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/ivy.xml?rev=1128099&r1=1128098&r2=1128099&view=diff
==============================================================================
--- incubator/hcatalog/trunk/ivy.xml (original)
+++ incubator/hcatalog/trunk/ivy.xml Thu May 26 21:49:12 2011
@@ -43,5 +43,8 @@
         <dependency org="org.apache.hadoop" name="hadoop-core" 
rev="${hadoop-core.version}"
           conf="common->master"/>
           -->
-    </dependencies>
+        <dependency org="javax.jms" name="jms" rev="${jms.version}" 
conf="common->master" />
+ <dependency org="org.apache.activemq" name="activemq-core" 
rev="${activemq.version}" conf="common->master" />
+<dependency org="javax.management.j2ee" name="management-api" 
rev="${javax-mgmt.version}" conf="common->master" /> 
+</dependencies>
 </ivy-module>

Modified: incubator/hcatalog/trunk/ivy/ivysettings.xml
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/ivy/ivysettings.xml?rev=1128099&r1=1128098&r2=1128099&view=diff
==============================================================================
--- incubator/hcatalog/trunk/ivy/ivysettings.xml (original)
+++ incubator/hcatalog/trunk/ivy/ivysettings.xml Thu May 26 21:49:12 2011
@@ -36,10 +36,17 @@
   <include url="${ivy.default.conf.dir}/ivyconf-local.xml"/>
   <settings defaultResolver="default"/>
   <resolvers>
+    <ibiblio
+      name="jboss"
+      m2compatible="true"
+      root="https://repository.jboss.org/nexus/content/groups/public";
+      
pattern="[organisation]/[module]/[revision]/[artifact]-[revision](-[classifier]).[ext]"
 />
+
     <ibiblio name="maven2" root="${repo.maven.org}" 
pattern="${maven2.pattern.ext}" m2compatible="true"/>
     <chain name="default" dual="true">
       <resolver ref="local"/>
       <resolver ref="maven2"/>
+      <resolver ref="jboss"/>
     </chain>
     <chain name="internal">
       <resolver ref="local"/>

Modified: incubator/hcatalog/trunk/ivy/libraries.properties
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/ivy/libraries.properties?rev=1128099&r1=1128098&r2=1128099&view=diff
==============================================================================
--- incubator/hcatalog/trunk/ivy/libraries.properties (original)
+++ incubator/hcatalog/trunk/ivy/libraries.properties Thu May 26 21:49:12 2011
@@ -18,4 +18,6 @@ ivy.version=2.2.0
 pig.version=0.8.0
 commons-cli.version=1.0
 #hadoop-core.version=0.20.2 Waiting for a secure version of hadoop in maven
-
+jms.version=1.1
+activemq.version=5.5.0
+javax-mgmt.version=1.1-rev-1

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1128099&r1=1128098&r2=1128099&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java 
(original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java 
Thu May 26 21:49:12 2011
@@ -64,4 +64,20 @@ public final class HCatConstants {
   public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + 
".info";
   public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + 
".hive.conf";
   public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + 
".token.sig";
+  
+  public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq";
+  public static final String HCAT_MSG_EXPIRY_DURATION = 
"hcat.msg.expiry.duration";
+  
+  public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
+  public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = 
"hcat.msgbus.topic.naming.policy";
+
+  // Message Bus related properties.
+  public static final String HCAT_TOPIC = "HCAT";
+  public static final String HCAT_EVENT = "HCAT_EVENT";
+  public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
+  public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";
+  public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE";
+  public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE";
+  public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE";
+  public static final String HCAT_DROP_DATABASE_EVENT = "HCAT_DROP_DATABASE";
 }

Added: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1128099&view=auto
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
 (added)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
 Thu May 26 21:49:12 2011
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.Query;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Implementation of {@link 
org.apache.hadoop.hive.metastore.MetaStoreEventListener}
+ * It sends message on two type of topics. One has name of form dbName.tblName
+ * On this topic, two kind of messages are sent: add/drop partition and 
+ * finalize_partition message.
+ * Second topic has name "HCAT" and messages sent on it are: add/drop database
+ * and add/drop table.
+ * All messages also has a property named "HCAT_EVENT" set on them whose value
+ * can be used to configure message selector on subscriber side.  
+ */
+public class NotificationListener extends MetaStoreEventListener{
+
+       private static final Log LOG = 
LogFactory.getLog(NotificationListener.class);
+       private Session session;
+       private Connection conn;
+
+       /**
+        * Create message bus connection and session in constructor.
+        */
+       public NotificationListener(final Configuration conf) {
+
+               super(conf);
+               try {
+                       Context jndiCntxt = new InitialContext();
+                       ConnectionFactory connFac = 
(ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
+                       conn = connFac.createConnection();
+                       conn.start();
+                       // We want message to be sent when session commits, 
thus we run in
+                       // transacted mode.
+                       session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+
+               } catch (NamingException e) {
+                       LOG.error("JNDI error while setting up Message Bus 
connection. " +
+                                       "Please make sure file named 
'jndi.properties' is in " +
+                                       "classpath and contains appropriate 
key-value pairs.",e);
+               }
+               catch (JMSException e) {
+                       LOG.error("Failed to initialize connection to message 
bus",e);
+               }
+               catch(Throwable t){
+                       LOG.error("HCAT Listener failed to load",t);
+               }
+       }
+
+       @Override
+       public void onAddPartition(AddPartitionEvent partitionEvent) throws 
MetaException {
+               // Subscriber can get notification of newly add partition in a 
+               // particular table by listening on a topic named 
"dbName.tableName" 
+               // and message selector string as "HCAT_EVENT = 
HCAT_ADD_PARTITION" 
+               if(partitionEvent.getStatus()){
+
+                       Partition partition = partitionEvent.getPartition();
+                       String topicName;
+                       try {
+                               topicName = 
partitionEvent.getHandler().get_table(
+                                               partition.getDbName(), 
partition.getTableName()).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+                       } catch (NoSuchObjectException e) {
+                               throw new MetaException(e.toString());
+                       }
+                       send(partition, topicName, 
HCatConstants.HCAT_ADD_PARTITION_EVENT);                     
+               }
+
+       }
+
+       @Override
+       public void onDropPartition(DropPartitionEvent partitionEvent) throws 
MetaException {
+               // Subscriber can get notification of dropped partition in a 
+               // particular table by listening on a topic named 
"dbName.tableName" 
+               // and message  selector string as "HCAT_EVENT = 
HCAT_DROP_PARTITION" 
+
+               // Datanucleus throws NPE when we try to serialize a partition 
object
+               // retrieved from metastore. To workaround that we reset 
following objects
+
+               if(partitionEvent.getStatus()){
+                       Partition partition = partitionEvent.getPartition();
+                       StorageDescriptor sd = partition.getSd();
+                       sd.setBucketCols(new ArrayList<String>());
+                       sd.setSortCols(new ArrayList<Order>());
+                       sd.setParameters(new HashMap<String, String>());
+                       sd.getSerdeInfo().setParameters(new HashMap<String, 
String>());
+                       String topicName;
+                       try {
+                               topicName = 
partitionEvent.getHandler().get_table(
+                                               partition.getDbName(), 
partition.getTableName()).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+                       } catch (NoSuchObjectException e) {
+                               throw new MetaException(e.toString());
+                       }
+                       send(partition, topicName, 
HCatConstants.HCAT_DROP_PARTITION_EVENT);
+               }
+       }
+
+       @Override
+       public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws 
MetaException {
+               // Subscriber can get notification about addition of a database 
in HCAT
+               // by listening on a topic named "HCAT" and message selector 
string
+               // as "HCAT_EVENT = HCAT_ADD_DATABASE" 
+               if(dbEvent.getStatus())
+                       
send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_ADD_DATABASE_EVENT);
+       }
+
+       @Override
+       public void onDropDatabase(DropDatabaseEvent dbEvent) throws 
MetaException {
+               // Subscriber can get notification about drop of a database in 
HCAT
+               // by listening on a topic named "HCAT" and message selector 
string
+               // as "HCAT_EVENT = HCAT_DROP_DATABASE" 
+               if(dbEvent.getStatus())
+                       
send(dbEvent.getDatabase(),HCatConstants.HCAT_TOPIC,HCatConstants.HCAT_DROP_DATABASE_EVENT);
+       }
+
+       @Override
+       public void onCreateTable(CreateTableEvent tableEvent) throws 
MetaException {
+               // Subscriber can get notification about addition of  a table 
in HCAT
+               // by listening on a topic named "HCAT" and message selector 
string
+               // as "HCAT_EVENT = HCAT_ADD_TABLE" 
+               if(tableEvent.getStatus()){
+                       if(tableEvent.getStatus()){
+                               Table tbl = tableEvent.getTable();
+                               Table newTbl = tbl.deepCopy();
+                               HMSHandler handler = tableEvent.getHandler();
+                               String namingPolicy = 
handler.getHiveConf().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAMING_POLICY, 
"tablename");
+                               
newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, 
getTopicNameForParts(namingPolicy, tbl.getDbName(), tbl.getTableName()));
+                               try {
+                                       handler.alter_table(tbl.getDbName(), 
tbl.getTableName(), newTbl);
+                               } catch (InvalidOperationException e) {
+                                       throw new MetaException(e.toString());
+                               }
+                               
send(tableEvent.getTable(),HCatConstants.HCAT_TOPIC+"."+tbl.getDbName(), 
HCatConstants.HCAT_ADD_TABLE_EVENT);
+                       }
+               }       
+       }
+
+       private String getTopicNameForParts(String namingPolicy, String dbName, 
String tblName){
+               // we only have one policy now, so ignore policy param for now.
+               return HCatConstants.HCAT_TOPIC+"."+dbName+"."+tblName;
+       }
+
+       @Override
+       public void onDropTable(DropTableEvent tableEvent) throws MetaException 
{
+               // Subscriber can get notification about drop of a  table in 
HCAT
+               // by listening on a topic named "HCAT" and message selector 
string
+               // as "HCAT_EVENT = HCAT_DROP_TABLE" 
+
+               // Datanucleus throws NPE when we try to serialize a table 
object
+               // retrieved from metastore. To workaround that we reset 
following objects
+
+               if(tableEvent.getStatus()){
+                       Table table = tableEvent.getTable();
+                       StorageDescriptor sd = table.getSd();
+                       sd.setBucketCols(new ArrayList<String>());
+                       sd.setSortCols(new ArrayList<Order>());
+                       sd.setParameters(new HashMap<String, String>());
+                       sd.getSerdeInfo().setParameters(new HashMap<String, 
String>());
+                       
send(table,HCatConstants.HCAT_TOPIC+"."+table.getDbName(), 
HCatConstants.HCAT_DROP_TABLE_EVENT);        
+               }
+       }
+
+       /**
+        * @param msgBody is the metastore object. It is sent in full such that
+        * if subscriber is really interested in details, it can reconstruct it 
fully.
+        * In case of finalize_partition message this will be string 
specification of 
+        * the partition.
+        * @param topicName is the name on message broker on which message is 
sent.
+        * @param event is the value of HCAT_EVENT property in message. It can 
be 
+        * used to select messages in client side. 
+        */
+       private void send(Serializable msgBody, String topicName, String event){
+
+               if(null == session){
+                       // If we weren't able to setup the session in the 
constructor
+                       // we cant send message in any case.
+                       LOG.error("Invalid session. Failed to send message on 
topic: "+
+                                       topicName + " event: "+event);
+                       return;
+               }
+
+               try{
+                       // Topics are created on demand. If it doesn't exist on 
broker it will
+                       // be created when broker receives this message.
+                       Destination topic = session.createTopic(topicName);
+                       MessageProducer producer = 
session.createProducer(topic);
+                       ObjectMessage msg = 
session.createObjectMessage(msgBody);
+                       msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
+                       producer.send(msg);
+                       // Message must be transacted before we return.
+                       session.commit();
+               } catch(Exception e){
+                       // Gobble up the exception. Message delivery is best 
effort.
+                       LOG.error("Failed to send message on topic: "+topicName 
+ 
+                                       " event: "+event , e);
+               }
+       }
+
+       @Override
+       protected void finalize() throws Throwable {
+               // Close the connection before dying.
+               try {
+                       if(conn != null) {
+                               conn.close();
+                       }
+               } catch (Exception ignore) {
+                       LOG.info("Failed to close message bus connection.", 
ignore);
+               }
+       }
+}

Added: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java?rev=1128099&view=auto
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java
 (added)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java
 Thu May 26 21:49:12 2011
@@ -0,0 +1,66 @@
+package org.apache.hcatalog.metadata;
+
+public class HPartitionDoneEvent {
+
+       private String dbName;
+       
+       private String tblName;
+       
+       private String partVals;
+
+       private Long createTime;
+       
+       /**
+        * @return the createTime
+        */
+       public Long getCreateTime() {
+               return createTime;
+       }
+
+       HPartitionDoneEvent(String dbName, String tblName, String partSpec) {
+               super();
+               this.dbName = dbName;
+               this.tblName = tblName;
+               this.partVals = partSpec;
+               this.createTime = System.currentTimeMillis();
+       }
+
+       /* (non-Javadoc)
+        * @see java.lang.Object#toString()
+        */
+       @Override
+       public String toString() {
+               return "HPartitionDoneEvent [dbName=" + dbName + ", tblName=" + 
tblName
+                               + ", partSpec=" + partVals + ", createTime=" + 
createTime + "]";
+       }
+
+       public HPartitionDoneEvent() {}
+
+       /**
+        * @param dbName the dbName to set
+        */
+       public void setDbName(String dbName) {
+               this.dbName = dbName;
+       }
+
+       /**
+        * @param tblName the tblName to set
+        */
+       public void setTblName(String tblName) {
+               this.tblName = tblName;
+       }
+
+       /**
+        * @param partSpec the partSpec to set
+        */
+       public void setPartSpec(String partSpec) {
+               this.partVals = partSpec;
+       }
+
+       /**
+        * @param createTime the createTime to set
+        */
+       public void setCreateTime(Long createTime) {
+               this.createTime = createTime;
+       }
+}

Added: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/ormodel.jdo
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/ormodel.jdo?rev=1128099&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/ormodel.jdo 
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/ormodel.jdo 
Thu May 26 21:49:12 2011
@@ -0,0 +1,22 @@
+<?xml version="1.0"?> 
+<!DOCTYPE jdo PUBLIC "-//Sun Microsystems, Inc.//DTD Java Data Objects 
Metadata 2.0//EN" 
+  "http://java.sun.com/dtd/jdo_2_0.dtd";>
+
+<jdo>  
+  <package name="org.apache.hcatalog.metadata">  
+    <class name="HPartitionDoneEvent"  table="HCAT_DONE_PARTITIONS" 
detachable="true">  
+      <field name="dbName">  
+        <column name="DBNAME" length="128" jdbc-type="VARCHAR"/>
+      </field>
+      <field name="tblName">
+        <column name="TBLNAME" length="128" jdbc-type="VARCHAR"/>
+      </field>
+       <field name="partVals">
+        <column name="partVals" length="4000" jdbc-type="VARCHAR"/>
+      </field>
+     <field name="createTime">
+        <column name="CREATE_TIME"  jdbc-type="LONG"/>
+      </field>
+    </class>
+  </package>
+</jdo>

Added: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1128099&view=auto
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
 (added)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
 Thu May 26 21:49:12 2011
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.thrift.TException;
+
+import junit.framework.TestCase;
+
+public class TestNotificationListener extends TestCase implements 
MessageListener{
+
+       private HiveConf hiveConf;
+       private Driver driver;
+       private AtomicInteger cntInvocation = new AtomicInteger(0);
+
+       @Override
+       protected void setUp() throws Exception {
+
+               super.setUp();
+               System.setProperty("java.naming.factory.initial", 
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+               System.setProperty("java.naming.provider.url", 
"vm://localhost?broker.persistent=false");
+               ConnectionFactory connFac = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+               Connection conn = connFac.createConnection();
+               conn.start();
+               // We want message to be sent when session commits, thus we run 
in
+               // transacted mode.
+               Session session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+               Destination hcatTopic = 
session.createTopic(HCatConstants.HCAT_TOPIC);
+               MessageConsumer consumer1 = session.createConsumer(hcatTopic);
+               consumer1.setMessageListener(this);
+               Destination tblTopic = 
session.createTopic(HCatConstants.HCAT_TOPIC+".mydb.mytbl");
+               MessageConsumer consumer2 = session.createConsumer(tblTopic);
+               consumer2.setMessageListener(this);
+               Destination dbTopic = 
session.createTopic(HCatConstants.HCAT_TOPIC+".mydb");
+               MessageConsumer consumer3 = session.createConsumer(dbTopic);
+               consumer3.setMessageListener(this);
+               hiveConf = new HiveConf(this.getClass());
+               
hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
+               hiveConf.set("hive.metastore.local", "true");
+               hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+               hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+               
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+               SessionState.start(new CliSessionState(hiveConf));
+               driver = new Driver(hiveConf);
+       }
+
+       @Override
+       protected void tearDown() throws Exception {
+               assertEquals(6, cntInvocation.get());
+               super.tearDown();
+       }
+
+       public void testAMQListener() throws MetaException, TException, 
UnknownTableException, NoSuchObjectException, CommandNeedRetryException{
+               driver.run("create database mydb");
+               driver.run("use mydb");
+               driver.run("create table mytbl (a string) partitioned by (b 
string)");
+               driver.run("alter table mytbl add partition(b='2011')");
+               driver.run("alter table mytbl drop partition(b='2011')");
+               driver.run("drop table mytbl");
+               driver.run("drop database mydb");
+       }
+
+       @Override
+       public void onMessage(Message msg) {
+               cntInvocation.incrementAndGet();
+
+               String event;
+               try {
+                       event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+                       if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){
+
+                               
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                               assertEquals("mydb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
+                       }
+                       else 
if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){
+
+                               
assertEquals("topic://HCAT.mydb",msg.getJMSDestination().toString());
+                               Table tbl = 
(Table)(((ObjectMessage)msg).getObject());
+                               assertEquals("mytbl", tbl.getTableName());
+                               assertEquals("mydb", tbl.getDbName());
+                               assertEquals(1, tbl.getPartitionKeysSize());
+                       }
+                       else 
if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){
+
+                               
assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString());
+                               Partition part = 
(Partition)(((ObjectMessage)msg).getObject());
+                               assertEquals("mytbl", part.getTableName());
+                               assertEquals("mydb", part.getDbName());
+                               List<String> vals = new ArrayList<String>(1);
+                               vals.add("2011");
+                               assertEquals(vals,part.getValues());
+                       }
+                       else 
if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){
+
+                               
assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString());
+                               Partition part = 
(Partition)(((ObjectMessage)msg).getObject());
+                               assertEquals("mytbl", part.getTableName());
+                               assertEquals("mydb", part.getDbName());
+                               List<String> vals = new ArrayList<String>(1);
+                               vals.add("2011");
+                               assertEquals(vals,part.getValues());
+                       }
+                       else 
if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){
+
+                               
assertEquals("topic://HCAT.mydb",msg.getJMSDestination().toString());
+                               Table tbl = 
(Table)(((ObjectMessage)msg).getObject());
+                               assertEquals("mytbl", tbl.getTableName());
+                               assertEquals("mydb", tbl.getDbName());
+                               assertEquals(1, tbl.getPartitionKeysSize());
+                       }
+                       else 
if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){
+
+                               
assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+                               assertEquals("mydb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
+                       }
+                       else
+                               assert false;
+               } catch (JMSException e) {
+                       e.printStackTrace(System.err);
+                       assert false;
+               }
+       }
+}


Reply via email to