Author: macyang
Date: Mon Jun 20 21:36:52 2011
New Revision: 1137792

URL: http://svn.apache.org/viewvc?rev=1137792&view=rev
Log:
HCAT-46: Send a message on a message bus when a partition is marked done 
(hashutosh via macyang)

Removed:
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/ormodel.jdo
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/build.xml
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
    
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Jun 20 21:36:52 2011
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
  
+    HCAT-46. Send a message on a message bus when a partition is marked done 
(hashutosh via macyang)
+
     HCAT-3. Send a message on a message bus when events occur in Metastore 
(hashutosh)
   
     HCAT-16. Add InputFormat/OutputFormat for handling exported 
tables/partitions.

Modified: incubator/hcatalog/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/build.xml?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/build.xml (original)
+++ incubator/hcatalog/trunk/build.xml Mon Jun 20 21:36:52 2011
@@ -231,7 +231,6 @@
       <compilerarg line="${javac.args}"/>
       <classpath refid="classpath" />
     </javac>
-  <antcall target="model-enhance" />
   </target>
  
   <!-- Build the hcatalog client jar -->
@@ -259,32 +258,6 @@
   -->
   <target name="jar" depends="clientjar,server-extensions"/>
 
-
- <!--
-  
================================================================================
-  Datanucleus Section
-  
================================================================================
-  -->
-
- <target name="model-enhance">
-    <taskdef name="datanucleusenhancer"
-                classname="org.datanucleus.enhancer.tools.EnhancerTask">
-       <classpath refid="classpath"/>
-   </taskdef>
-
-    <datanucleusenhancer
-        dir="${basedir}" failonerror="true" verbose="true" fork="true">
-    <fileset dir="${src.dir}/org/apache/hcatalog/metadata/">
-       <include name="ormodel.jdo" />
-    </fileset>   
-    <classpath>
-          <path refid="classpath"/>
-          <pathelement path="${build.dir}/classes/"/>
-        </classpath>
-        <jvmarg 
line="-Dlog4j.configuration=${basedir}/../conf/hive-log4j.properties"/>
-    </datanucleusenhancer>
-  </target>
-  
   <!--
   
================================================================================
   Test Section

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java 
(original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java 
Mon Jun 20 21:36:52 2011
@@ -77,6 +77,7 @@ public final class HCatConstants {
   public static final String HCAT_EVENT = "HCAT_EVENT";
   public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
   public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";
+  public static final String HCAT_PARTITION_DONE_EVENT = "HCAT_PARTITION_DONE";
   public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE";
   public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE";
   public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE";

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
 (original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
 Mon Jun 20 21:36:52 2011
@@ -21,6 +21,8 @@ package org.apache.hcatalog.listener;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -28,6 +30,8 @@ import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
@@ -54,6 +58,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
 import org.apache.hcatalog.common.HCatConstants;
 
 /**
@@ -69,8 +74,8 @@ import org.apache.hcatalog.common.HCatCo
 public class NotificationListener extends MetaStoreEventListener{
 
        private static final Log LOG = 
LogFactory.getLog(NotificationListener.class);
-       private Session session;
-       private Connection conn;
+       protected Session session;
+       protected Connection conn;
 
        /**
         * Create message bus connection and session in constructor.
@@ -202,7 +207,7 @@ public class NotificationListener extend
         * @param event is the value of HCAT_EVENT property in message. It can 
be 
         * used to select messages in client side. 
         */
-       private void send(Serializable msgBody, String topicName, String event){
+       protected void send(Object msgBody, String topicName, String event){
 
                try{
 
@@ -235,7 +240,19 @@ public class NotificationListener extend
                                return;
                        }
                        MessageProducer producer = 
session.createProducer(topic);
-                       ObjectMessage msg = 
session.createObjectMessage(msgBody);
+                       Message msg;
+                       if (msgBody instanceof Map){
+                               MapMessage mapMsg = session.createMapMessage();
+                               Map<String,String> incomingMap = 
(Map<String,String>)msgBody;
+                               for (Entry<String,String> partCol : 
incomingMap.entrySet()){
+                                       mapMsg.setString(partCol.getKey(), 
partCol.getValue());
+                               }
+                               msg = mapMsg;
+                       }
+                       else {
+                               msg = 
session.createObjectMessage((Serializable)msgBody);
+                       }
+
                        msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
                        producer.send(msg);
                        // Message must be transacted before we return.
@@ -289,4 +306,11 @@ public class NotificationListener extend
                        LOG.info("Failed to close message bus connection.", 
ignore);
                }
        }
+
+       @Override
+       public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+                       throws MetaException {
+               if(lpde.getStatus())
+                       
send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT);
+       }
 }

Modified: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
 (original)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
 Mon Jun 20 21:36:52 2011
@@ -19,13 +19,16 @@
 package org.apache.hcatalog.listener;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -36,11 +39,16 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
@@ -89,15 +97,20 @@ public class TestNotificationListener ex
 
        @Override
        protected void tearDown() throws Exception {
-               assertEquals(6, cntInvocation.get());
+               assertEquals(7, cntInvocation.get());
                super.tearDown();
        }
 
-       public void testAMQListener() throws MetaException, TException, 
UnknownTableException, NoSuchObjectException, CommandNeedRetryException{
+       public void testAMQListener() throws MetaException, TException, 
UnknownTableException, NoSuchObjectException, 
+       CommandNeedRetryException, UnknownDBException, 
InvalidPartitionException, UnknownPartitionException{
                driver.run("create database mydb");
                driver.run("use mydb");
                driver.run("create table mytbl (a string) partitioned by (b 
string)");
                driver.run("alter table mytbl add partition(b='2011')");
+               HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
+               Map<String,String> kvs = new HashMap<String, String>(1);
+               kvs.put("b", "2011");
+               msc.markPartitionForEvent("mydb", "mytbl", kvs, 
PartitionEventType.LOAD_DONE);
                driver.run("alter table mytbl drop partition(b='2011')");
                driver.run("drop table mytbl");
                driver.run("drop database mydb");
@@ -156,7 +169,11 @@ public class TestNotificationListener ex
                                
assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
                                assertEquals("mydb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
                        }
-                       else
+                       else if 
(event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
+                               
assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString());
+                               MapMessage mapMsg = (MapMessage)msg;
+                               assert mapMsg.getString("b").equals("2011");
+                       } else
                                assert false;
                } catch (JMSException e) {
                        e.printStackTrace(System.err);


Reply via email to