Modified: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java
 (original)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java
 Fri Jun 22 01:40:27 2012
@@ -52,134 +52,143 @@ import org.junit.Test;
 
 public class TestReaderWriter {
 
-       @Test
-       public void test() throws MetaException, CommandNeedRetryException, 
IOException, ClassNotFoundException {
-
-               HiveConf conf = new HiveConf(getClass());
-               Driver driver = new Driver(conf);
-               SessionState.start(new CliSessionState(conf));
-               driver.run("drop table mytbl");
-               driver.run("create table mytbl (a string, b int)");
-               Iterator<Entry<String,String>> itr = conf.iterator();
-               Map<String,String> map = new HashMap<String, String>();
-               while(itr.hasNext()){
-                       Entry<String,String> kv = itr.next();
-                       map.put(kv.getKey(), kv.getValue());
-               }
-               
-               WriterContext cntxt = runsInMaster(map);
-               
-               File writeCntxtFile = File.createTempFile("hcat-write", "temp");
-               writeCntxtFile.deleteOnExit();
-               
-               // Serialize context.
-               ObjectOutputStream oos = new ObjectOutputStream(new 
FileOutputStream(writeCntxtFile));
-               oos.writeObject(cntxt);
-               oos.flush();
-               oos.close();
-               
-               // Now, deserialize it.
-               ObjectInputStream ois = new ObjectInputStream(new 
FileInputStream(writeCntxtFile));
-               cntxt = (WriterContext) ois.readObject();
-               ois.close();
-               
-               runsInSlave(cntxt);
-               commit(map, true, cntxt);
-               
-               ReaderContext readCntxt = runsInMaster(map, false);
-               
-               File readCntxtFile = File.createTempFile("hcat-read", "temp");
-               readCntxtFile.deleteOnExit();
-               oos = new ObjectOutputStream(new 
FileOutputStream(readCntxtFile));
-               oos.writeObject(readCntxt);
-               oos.flush();
-               oos.close();
-               
-               ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
-               readCntxt = (ReaderContext) ois.readObject();
-               ois.close();
-               
-               
-               for(InputSplit split : readCntxt.getSplits()){
-                       runsInSlave(split, readCntxt.getConf());                
        
-               }
-       }
-
-       private WriterContext runsInMaster(Map<String, String> config) throws 
HCatException {
-
-               WriteEntity.Builder builder = new WriteEntity.Builder();
-               WriteEntity entity = builder.withTable("mytbl").build();
-               HCatWriter writer = DataTransferFactory.getHCatWriter(entity, 
config);
-               WriterContext info = writer.prepareWrite();
-               return info;
-       }
-       
-       private ReaderContext runsInMaster(Map<String,String> config, boolean 
bogus) throws HCatException {
-
-               ReadEntity.Builder builder = new ReadEntity.Builder();
-               ReadEntity entity = builder.withTable("mytbl").build();
-               HCatReader reader = DataTransferFactory.getHCatReader(entity, 
config);
-               ReaderContext cntxt = reader.prepareRead();
-               return cntxt;
-       }
-
-       private void runsInSlave(InputSplit split, Configuration config) throws 
HCatException {
-
-               HCatReader reader = DataTransferFactory.getHCatReader(split, 
config);
-               Iterator<HCatRecord> itr = reader.read();
-               int i = 1;
-               while(itr.hasNext()){
-                       HCatRecord read = itr.next();
-                       HCatRecord written = getRecord(i++);
-                       // Argh, HCatRecord doesnt implement equals()
-                       Assert.assertTrue("Read: " + read.get(0) + "Written: " 
+ written.get(0),  written.get(0).equals(read.get(0)));
-                       Assert.assertTrue("Read: " + read.get(1) + "Written: " 
+ written.get(1),  written.get(1).equals(read.get(1)));
-                       Assert.assertEquals(2, read.size());
-               }
-               Assert.assertFalse(itr.hasNext());
-       }
-       
-       private void runsInSlave(WriterContext context) throws HCatException {
-
-               HCatWriter writer = DataTransferFactory.getHCatWriter(context);
-               writer.write(new HCatRecordItr());
-       }
-
-       private void commit(Map<String, String> config, boolean status, 
WriterContext context) throws IOException {
-
-               WriteEntity.Builder builder = new WriteEntity.Builder();
-               WriteEntity entity = builder.withTable("mytbl").build();
-               HCatWriter writer = DataTransferFactory.getHCatWriter(entity, 
config);
-               if(status){
-                       writer.commit(context);                 
-               } else {
-                       writer.abort(context);
-               }
-       } 
-
-       private static HCatRecord getRecord(int i) {
-               List<Object> list = new ArrayList<Object>(2);
-               list.add("Row #: " + i);
-               list.add(i);
-               return new DefaultHCatRecord(list);
-       }
-       
-       private static class HCatRecordItr implements Iterator<HCatRecord> {
-
-               int i = 0;
-               @Override
-               public boolean hasNext() {
-                       return i++ < 100 ? true : false;
-               }
-
-               @Override
-               public HCatRecord next() {
-                       return getRecord(i);
-               }
-
-               @Override
-               public void remove() {
-                       throw new RuntimeException();
-               }
-       }
+  @Test
+  public void test() throws MetaException, CommandNeedRetryException,
+      IOException, ClassNotFoundException {
+
+    HiveConf conf = new HiveConf(getClass());
+    Driver driver = new Driver(conf);
+    SessionState.start(new CliSessionState(conf));
+    driver.run("drop table mytbl");
+    driver.run("create table mytbl (a string, b int)");
+    Iterator<Entry<String, String>> itr = conf.iterator();
+    Map<String, String> map = new HashMap<String, String>();
+    while (itr.hasNext()) {
+      Entry<String, String> kv = itr.next();
+      map.put(kv.getKey(), kv.getValue());
+    }
+
+    WriterContext cntxt = runsInMaster(map);
+
+    File writeCntxtFile = File.createTempFile("hcat-write", "temp");
+    writeCntxtFile.deleteOnExit();
+
+    // Serialize context.
+    ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(
+        writeCntxtFile));
+    oos.writeObject(cntxt);
+    oos.flush();
+    oos.close();
+
+    // Now, deserialize it.
+    ObjectInputStream ois = new ObjectInputStream(new FileInputStream(
+        writeCntxtFile));
+    cntxt = (WriterContext) ois.readObject();
+    ois.close();
+
+    runsInSlave(cntxt);
+    commit(map, true, cntxt);
+
+    ReaderContext readCntxt = runsInMaster(map, false);
+
+    File readCntxtFile = File.createTempFile("hcat-read", "temp");
+    readCntxtFile.deleteOnExit();
+    oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
+    oos.writeObject(readCntxt);
+    oos.flush();
+    oos.close();
+
+    ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
+    readCntxt = (ReaderContext) ois.readObject();
+    ois.close();
+
+    for (InputSplit split : readCntxt.getSplits()) {
+      runsInSlave(split, readCntxt.getConf());
+    }
+  }
+
+  private WriterContext runsInMaster(Map<String, String> config)
+      throws HCatException {
+
+    WriteEntity.Builder builder = new WriteEntity.Builder();
+    WriteEntity entity = builder.withTable("mytbl").build();
+    HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+    WriterContext info = writer.prepareWrite();
+    return info;
+  }
+
+  private ReaderContext runsInMaster(Map<String, String> config, boolean bogus)
+      throws HCatException {
+
+    ReadEntity.Builder builder = new ReadEntity.Builder();
+    ReadEntity entity = builder.withTable("mytbl").build();
+    HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
+    ReaderContext cntxt = reader.prepareRead();
+    return cntxt;
+  }
+
+  private void runsInSlave(InputSplit split, Configuration config)
+      throws HCatException {
+
+    HCatReader reader = DataTransferFactory.getHCatReader(split, config);
+    Iterator<HCatRecord> itr = reader.read();
+    int i = 1;
+    while (itr.hasNext()) {
+      HCatRecord read = itr.next();
+      HCatRecord written = getRecord(i++);
+      // Argh, HCatRecord doesnt implement equals()
+      Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
+          written.get(0).equals(read.get(0)));
+      Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
+          written.get(1).equals(read.get(1)));
+      Assert.assertEquals(2, read.size());
+    }
+    Assert.assertFalse(itr.hasNext());
+  }
+
+  private void runsInSlave(WriterContext context) throws HCatException {
+
+    HCatWriter writer = DataTransferFactory.getHCatWriter(context);
+    writer.write(new HCatRecordItr());
+  }
+
+  private void commit(Map<String, String> config, boolean status,
+      WriterContext context) throws IOException {
+
+    WriteEntity.Builder builder = new WriteEntity.Builder();
+    WriteEntity entity = builder.withTable("mytbl").build();
+    HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+    if (status) {
+      writer.commit(context);
+    } else {
+      writer.abort(context);
+    }
+  }
+
+  private static HCatRecord getRecord(int i) {
+    List<Object> list = new ArrayList<Object>(2);
+    list.add("Row #: " + i);
+    list.add(i);
+    return new DefaultHCatRecord(list);
+  }
+
+  private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+    int i = 0;
+
+    @Override
+    public boolean hasNext() {
+      return i++ < 100 ? true : false;
+    }
+
+    @Override
+    public HCatRecord next() {
+      return getRecord(i);
+    }
+
+    @Override
+    public void remove() {
+      throw new RuntimeException();
+    }
+  }
 }

Modified: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
 (original)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
 Fri Jun 22 01:40:27 2012
@@ -41,73 +41,82 @@ import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hcatalog.common.HCatConstants;
 
-public class TestMsgBusConnection extends TestCase{
+public class TestMsgBusConnection extends TestCase {
 
-       private Driver driver;
-       private BrokerService broker;
-       private MessageConsumer consumer;
-
-       @Override
-       protected void setUp() throws Exception {
-
-               super.setUp();
-               broker = new BrokerService();
-               // configure the broker
-               
broker.addConnector("tcp://localhost:61616?broker.persistent=false");
-
-               broker.start();
-
-               System.setProperty("java.naming.factory.initial", 
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
-               System.setProperty("java.naming.provider.url", 
"tcp://localhost:61616");
-               connectClient();
-               HiveConf hiveConf = new HiveConf(this.getClass());
-               
hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
-               hiveConf.set("hive.metastore.local", "true");
-               hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-               hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-               
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-               hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, 
"planetlab.hcat");
-               SessionState.start(new CliSessionState(hiveConf));
-               driver = new Driver(hiveConf);
-       }
-
-       private void connectClient() throws JMSException{
-               ConnectionFactory connFac = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
-               Connection conn = connFac.createConnection();
-               conn.start();
-               Session session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
-               Destination hcatTopic = session.createTopic("planetlab.hcat");
-               consumer = session.createConsumer(hcatTopic);
-       }
-
-       public void testConnection() throws Exception{
-
-               try{
-                       driver.run("create database testconndb");
-                       Message msg = consumer.receive();
-                       assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-                       
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
-                       assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
-                       broker.stop();
-                       driver.run("drop database testconndb cascade");
-                       broker.start(true);
-                       connectClient();
-                       driver.run("create database testconndb");
-                       msg = consumer.receive();
-                       assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-                       
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
-                       assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
-                       driver.run("drop database testconndb cascade");
-                       msg = consumer.receive();
-                       assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, 
msg.getStringProperty(HCatConstants.HCAT_EVENT));
-                       
assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
-                       assertEquals("testconndb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
-               } catch (NoSuchObjectException nsoe){
-                       nsoe.printStackTrace(System.err);
-                       assert false;
-               } catch (AlreadyExistsException aee){
-                       aee.printStackTrace(System.err);
-                       assert false;
-               }
-       }
+  private Driver driver;
+  private BrokerService broker;
+  private MessageConsumer consumer;
+
+  @Override
+  protected void setUp() throws Exception {
+
+    super.setUp();
+    broker = new BrokerService();
+    // configure the broker
+    broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+
+    broker.start();
+
+    System.setProperty("java.naming.factory.initial",
+        "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+    System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
+    connectClient();
+    HiveConf hiveConf = new HiveConf(this.getClass());
+    hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+        NotificationListener.class.getName());
+    hiveConf.set("hive.metastore.local", "true");
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
+    SessionState.start(new CliSessionState(hiveConf));
+    driver = new Driver(hiveConf);
+  }
+
+  private void connectClient() throws JMSException {
+    ConnectionFactory connFac = new ActiveMQConnectionFactory(
+        "tcp://localhost:61616");
+    Connection conn = connFac.createConnection();
+    conn.start();
+    Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+    Destination hcatTopic = session.createTopic("planetlab.hcat");
+    consumer = session.createConsumer(hcatTopic);
+  }
+
+  public void testConnection() throws Exception {
+
+    try {
+      driver.run("create database testconndb");
+      Message msg = consumer.receive();
+      assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+          msg.getStringProperty(HCatConstants.HCAT_EVENT));
+      assertEquals("topic://planetlab.hcat", 
msg.getJMSDestination().toString());
+      assertEquals("testconndb",
+          ((Database) ((ObjectMessage) msg).getObject()).getName());
+      broker.stop();
+      driver.run("drop database testconndb cascade");
+      broker.start(true);
+      connectClient();
+      driver.run("create database testconndb");
+      msg = consumer.receive();
+      assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+          msg.getStringProperty(HCatConstants.HCAT_EVENT));
+      assertEquals("topic://planetlab.hcat", 
msg.getJMSDestination().toString());
+      assertEquals("testconndb",
+          ((Database) ((ObjectMessage) msg).getObject()).getName());
+      driver.run("drop database testconndb cascade");
+      msg = consumer.receive();
+      assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
+          msg.getStringProperty(HCatConstants.HCAT_EVENT));
+      assertEquals("topic://planetlab.hcat", 
msg.getJMSDestination().toString());
+      assertEquals("testconndb",
+          ((Database) ((ObjectMessage) msg).getObject()).getName());
+    } catch (NoSuchObjectException nsoe) {
+      nsoe.printStackTrace(System.err);
+      assert false;
+    } catch (AlreadyExistsException aee) {
+      aee.printStackTrace(System.err);
+      assert false;
+    }
+  }
 }

Modified: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
 (original)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
 Fri Jun 22 01:40:27 2012
@@ -58,126 +58,137 @@ import org.apache.thrift.TException;
 
 import junit.framework.TestCase;
 
-public class TestNotificationListener extends TestCase implements 
MessageListener{
+public class TestNotificationListener extends TestCase implements
+    MessageListener {
 
-       private HiveConf hiveConf;
-       private Driver driver;
-       private AtomicInteger cntInvocation = new AtomicInteger(0);
-
-       @Override
-       protected void setUp() throws Exception {
-
-               super.setUp();
-               System.setProperty("java.naming.factory.initial", 
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
-               System.setProperty("java.naming.provider.url", 
"vm://localhost?broker.persistent=false");
-               ConnectionFactory connFac = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-               Connection conn = connFac.createConnection();
-               conn.start();
-               // We want message to be sent when session commits, thus we run 
in
-               // transacted mode.
-               Session session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
-               Destination hcatTopic = 
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
-               MessageConsumer consumer1 = session.createConsumer(hcatTopic);
-               consumer1.setMessageListener(this);
-               Destination tblTopic = 
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl");
-               MessageConsumer consumer2 = session.createConsumer(tblTopic);
-               consumer2.setMessageListener(this);
-               Destination dbTopic = 
session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb");
-               MessageConsumer consumer3 = session.createConsumer(dbTopic);
-               consumer3.setMessageListener(this);
-               hiveConf = new HiveConf(this.getClass());
-               
hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
-               hiveConf.set("hive.metastore.local", "true");
-               hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-               hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-               
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-               SessionState.start(new CliSessionState(hiveConf));
-               driver = new Driver(hiveConf);
-       }
-
-       @Override
-       protected void tearDown() throws Exception {
-               assertEquals(7, cntInvocation.get());
-               super.tearDown();
-       }
-
-       public void testAMQListener() throws MetaException, TException, 
UnknownTableException, NoSuchObjectException, 
-       CommandNeedRetryException, UnknownDBException, 
InvalidPartitionException, UnknownPartitionException{
-               driver.run("create database mydb");
-               driver.run("use mydb");
-               driver.run("create table mytbl (a string) partitioned by (b 
string)");
-               driver.run("alter table mytbl add partition(b='2011')");
-               HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
-               Map<String,String> kvs = new HashMap<String, String>(1);
-               kvs.put("b", "2011");
-               msc.markPartitionForEvent("mydb", "mytbl", kvs, 
PartitionEventType.LOAD_DONE);
-               driver.run("alter table mytbl drop partition(b='2011')");
-               driver.run("drop table mytbl");
-               driver.run("drop database mydb");
-       }
-
-       @Override
-       public void onMessage(Message msg) {
-               cntInvocation.incrementAndGet();
-
-               String event;
-               try {
-                       event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
-                       if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){
-
-                               
assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
-                               assertEquals("mydb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
-                       }
-                       else 
if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){
-
-                               
assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString());
-                               Table tbl = 
(Table)(((ObjectMessage)msg).getObject());
-                               assertEquals("mytbl", tbl.getTableName());
-                               assertEquals("mydb", tbl.getDbName());
-                               assertEquals(1, tbl.getPartitionKeysSize());
-                       }
-                       else 
if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){
-
-                               
assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString());
-                               Partition part = 
(Partition)(((ObjectMessage)msg).getObject());
-                               assertEquals("mytbl", part.getTableName());
-                               assertEquals("mydb", part.getDbName());
-                               List<String> vals = new ArrayList<String>(1);
-                               vals.add("2011");
-                               assertEquals(vals,part.getValues());
-                       }
-                       else 
if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){
-
-                               
assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString());
-                               Partition part = 
(Partition)(((ObjectMessage)msg).getObject());
-                               assertEquals("mytbl", part.getTableName());
-                               assertEquals("mydb", part.getDbName());
-                               List<String> vals = new ArrayList<String>(1);
-                               vals.add("2011");
-                               assertEquals(vals,part.getValues());
-                       }
-                       else 
if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){
-
-                               
assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString());
-                               Table tbl = 
(Table)(((ObjectMessage)msg).getObject());
-                               assertEquals("mytbl", tbl.getTableName());
-                               assertEquals("mydb", tbl.getDbName());
-                               assertEquals(1, tbl.getPartitionKeysSize());
-                       }
-                       else 
if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){
-
-                               
assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
-                               assertEquals("mydb", ((Database) 
((ObjectMessage)msg).getObject()).getName());
-                       }
-                       else if 
(event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
-                               
assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString());
-                               MapMessage mapMsg = (MapMessage)msg;
-                               assert mapMsg.getString("b").equals("2011");
-                       } else
-                               assert false;
-               } catch (JMSException e) {
-                       e.printStackTrace(System.err);
-                       assert false;
-               }
-       }
+  private HiveConf hiveConf;
+  private Driver driver;
+  private AtomicInteger cntInvocation = new AtomicInteger(0);
+
+  @Override
+  protected void setUp() throws Exception {
+
+    super.setUp();
+    System.setProperty("java.naming.factory.initial",
+        "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+    System.setProperty("java.naming.provider.url",
+        "vm://localhost?broker.persistent=false");
+    ConnectionFactory connFac = new ActiveMQConnectionFactory(
+        "vm://localhost?broker.persistent=false");
+    Connection conn = connFac.createConnection();
+    conn.start();
+    // We want message to be sent when session commits, thus we run in
+    // transacted mode.
+    Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+    Destination hcatTopic = session
+        .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+    MessageConsumer consumer1 = session.createConsumer(hcatTopic);
+    consumer1.setMessageListener(this);
+    Destination tblTopic = session
+        .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl");
+    MessageConsumer consumer2 = session.createConsumer(tblTopic);
+    consumer2.setMessageListener(this);
+    Destination dbTopic = session
+        .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb");
+    MessageConsumer consumer3 = session.createConsumer(dbTopic);
+    consumer3.setMessageListener(this);
+    hiveConf = new HiveConf(this.getClass());
+    hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+        NotificationListener.class.getName());
+    hiveConf.set("hive.metastore.local", "true");
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    SessionState.start(new CliSessionState(hiveConf));
+    driver = new Driver(hiveConf);
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    assertEquals(7, cntInvocation.get());
+    super.tearDown();
+  }
+
+  public void testAMQListener() throws MetaException, TException,
+      UnknownTableException, NoSuchObjectException, CommandNeedRetryException,
+      UnknownDBException, InvalidPartitionException, UnknownPartitionException 
{
+    driver.run("create database mydb");
+    driver.run("use mydb");
+    driver.run("create table mytbl (a string) partitioned by (b string)");
+    driver.run("alter table mytbl add partition(b='2011')");
+    HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
+    Map<String, String> kvs = new HashMap<String, String>(1);
+    kvs.put("b", "2011");
+    msc.markPartitionForEvent("mydb", "mytbl", kvs,
+        PartitionEventType.LOAD_DONE);
+    driver.run("alter table mytbl drop partition(b='2011')");
+    driver.run("drop table mytbl");
+    driver.run("drop database mydb");
+  }
+
+  @Override
+  public void onMessage(Message msg) {
+    cntInvocation.incrementAndGet();
+
+    String event;
+    try {
+      event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+      if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) {
+
+        assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+            .getJMSDestination().toString());
+        assertEquals("mydb",
+            ((Database) ((ObjectMessage) msg).getObject()).getName());
+      } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) {
+
+        assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
+        Table tbl = (Table) (((ObjectMessage) msg).getObject());
+        assertEquals("mytbl", tbl.getTableName());
+        assertEquals("mydb", tbl.getDbName());
+        assertEquals(1, tbl.getPartitionKeysSize());
+      } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
+
+        assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+            .toString());
+        Partition part = (Partition) (((ObjectMessage) msg).getObject());
+        assertEquals("mytbl", part.getTableName());
+        assertEquals("mydb", part.getDbName());
+        List<String> vals = new ArrayList<String>(1);
+        vals.add("2011");
+        assertEquals(vals, part.getValues());
+      } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
+
+        assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+            .toString());
+        Partition part = (Partition) (((ObjectMessage) msg).getObject());
+        assertEquals("mytbl", part.getTableName());
+        assertEquals("mydb", part.getDbName());
+        List<String> vals = new ArrayList<String>(1);
+        vals.add("2011");
+        assertEquals(vals, part.getValues());
+      } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
+
+        assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
+        Table tbl = (Table) (((ObjectMessage) msg).getObject());
+        assertEquals("mytbl", tbl.getTableName());
+        assertEquals("mydb", tbl.getDbName());
+        assertEquals(1, tbl.getPartitionKeysSize());
+      } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
+
+        assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+            .getJMSDestination().toString());
+        assertEquals("mydb",
+            ((Database) ((ObjectMessage) msg).getObject()).getName());
+      } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
+        assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+            .toString());
+        MapMessage mapMsg = (MapMessage) msg;
+        assert mapMsg.getString("b").equals("2011");
+      } else
+        assert false;
+    } catch (JMSException e) {
+      e.printStackTrace(System.err);
+      assert false;
+    }
+  }
 }

Modified: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
 (original)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
 Fri Jun 22 01:40:27 2012
@@ -110,18 +110,18 @@ public class TestHCatLoaderComplexSchema
     String pigSchema =
         "(" +
           "a: " +
-                   "(" +
-                     "aa: chararray, " +
-                     "ab: long, " +
-                     "ac: map[], " +
-                     "ad: { t: (ada: long) }, " +
-                     "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) 
}," +
-                     "af: (afa: chararray, afb: long) " +
-                   ")," +
-                  "b: chararray, " +
-                  "c: long, " +
-                  "d:  { t: (da:long, db: ( dba: chararray, dbb: long), dc: { 
t: (dca: long) } ) } " +
-                ")";
+            "(" +
+              "aa: chararray, " +
+              "ab: long, " +
+              "ac: map[], " +
+              "ad: { t: (ada: long) }, " +
+              "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
+              "af: (afa: chararray, afb: long) " +
+            ")," +
+           "b: chararray, " +
+           "c: long, " +
+           "d:  { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: 
(dca: long) } ) } " +
+         ")";
 
     // with extra structs
     String tableSchema =

Modified: 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java 
(original)
+++ 
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java 
Fri Jun 22 01:40:27 2012
@@ -132,7 +132,7 @@ public class TestHCatStorer extends Test
 
     driver.run("drop table employee");
     String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, 
emp_start_date STRING , emp_gender STRING ) " +
-               " PARTITIONED BY (emp_country STRING , emp_state STRING ) 
STORED AS RCFILE";
+        " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS 
RCFILE";
 
     int retCode = driver.run(createTable).getResponseCode();
     if(retCode != 0) {
@@ -148,7 +148,7 @@ public class TestHCatStorer extends Test
     PigServer pig = new PigServer(ExecType.LOCAL);
     pig.setBatchOn();
     pig.registerQuery("A = LOAD '"+INPUT_FILE_NAME+"' USING PigStorage() AS 
(emp_id:int,emp_name:chararray,emp_start_date:chararray," +
-               
"emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+        "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
     pig.registerQuery("TN = FILTER A BY emp_state == 'TN';");
     pig.registerQuery("KA = FILTER A BY emp_state == 'KA';");
     pig.registerQuery("KL = FILTER A BY emp_state == 'KL';");
@@ -415,7 +415,7 @@ public class TestHCatStorer extends Test
   public void testBagNStruct() throws IOException, CommandNeedRetryException{
   driver.run("drop table junit_unparted");
   String createTable = "create table junit_unparted(b string,a struct<a1:int>, 
 arr_of_struct array<string>, " +
-               "arr_of_struct2 array<struct<s1:string,s2:string>>,  
arr_of_struct3 array<struct<s3:string>>) stored as RCFILE";
+      "arr_of_struct2 array<struct<s1:string,s2:string>>,  arr_of_struct3 
array<struct<s3:string>>) stored as RCFILE";
   int retCode = driver.run(createTable).getResponseCode();
   if(retCode != 0) {
     throw new IOException("Failed to create table.");
@@ -430,7 +430,7 @@ public class TestHCatStorer extends Test
   server.setBatchOn();
   server.registerQuery("A = load '"+INPUT_FILE_NAME+"' as (b:chararray, 
a:tuple(a1:int), arr_of_struct:bag{mytup:tuple(s1:chararray)}, 
arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, 
arr_of_struct3:bag{t3:tuple(s3:chararray)});");
   server.registerQuery("store A into 'default.junit_unparted' using 
"+HCatStorer.class.getName()+"('','b:chararray, a:tuple(a1:int)," +
-               " arr_of_struct:bag{mytup:tuple(s1:chararray)}, 
arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, 
arr_of_struct3:bag{t3:tuple(s3:chararray)}');");
+      " arr_of_struct:bag{mytup:tuple(s1:chararray)}, 
arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, 
arr_of_struct3:bag{t3:tuple(s3:chararray)}');");
   server.executeBatch();
 
   driver.run("select * from junit_unparted");


Reply via email to