Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java?rev=1352738&r1=1352737&r2=1352738&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java Fri Jun 22 01:40:27 2012 @@ -52,134 +52,143 @@ import org.junit.Test; public class TestReaderWriter { - @Test - public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException { - - HiveConf conf = new HiveConf(getClass()); - Driver driver = new Driver(conf); - SessionState.start(new CliSessionState(conf)); - driver.run("drop table mytbl"); - driver.run("create table mytbl (a string, b int)"); - Iterator<Entry<String,String>> itr = conf.iterator(); - Map<String,String> map = new HashMap<String, String>(); - while(itr.hasNext()){ - Entry<String,String> kv = itr.next(); - map.put(kv.getKey(), kv.getValue()); - } - - WriterContext cntxt = runsInMaster(map); - - File writeCntxtFile = File.createTempFile("hcat-write", "temp"); - writeCntxtFile.deleteOnExit(); - - // Serialize context. - ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile)); - oos.writeObject(cntxt); - oos.flush(); - oos.close(); - - // Now, deserialize it. - ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile)); - cntxt = (WriterContext) ois.readObject(); - ois.close(); - - runsInSlave(cntxt); - commit(map, true, cntxt); - - ReaderContext readCntxt = runsInMaster(map, false); - - File readCntxtFile = File.createTempFile("hcat-read", "temp"); - readCntxtFile.deleteOnExit(); - oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile)); - oos.writeObject(readCntxt); - oos.flush(); - oos.close(); - - ois = new ObjectInputStream(new FileInputStream(readCntxtFile)); - readCntxt = (ReaderContext) ois.readObject(); - ois.close(); - - - for(InputSplit split : readCntxt.getSplits()){ - runsInSlave(split, readCntxt.getConf()); - } - } - - private WriterContext runsInMaster(Map<String, String> config) throws HCatException { - - WriteEntity.Builder builder = new WriteEntity.Builder(); - WriteEntity entity = builder.withTable("mytbl").build(); - HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); - WriterContext info = writer.prepareWrite(); - return info; - } - - private ReaderContext runsInMaster(Map<String,String> config, boolean bogus) throws HCatException { - - ReadEntity.Builder builder = new ReadEntity.Builder(); - ReadEntity entity = builder.withTable("mytbl").build(); - HCatReader reader = DataTransferFactory.getHCatReader(entity, config); - ReaderContext cntxt = reader.prepareRead(); - return cntxt; - } - - private void runsInSlave(InputSplit split, Configuration config) throws HCatException { - - HCatReader reader = DataTransferFactory.getHCatReader(split, config); - Iterator<HCatRecord> itr = reader.read(); - int i = 1; - while(itr.hasNext()){ - HCatRecord read = itr.next(); - HCatRecord written = getRecord(i++); - // Argh, HCatRecord doesnt implement equals() - Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0), written.get(0).equals(read.get(0))); - Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1), written.get(1).equals(read.get(1))); - Assert.assertEquals(2, read.size()); - } - Assert.assertFalse(itr.hasNext()); - } - - private void runsInSlave(WriterContext context) throws HCatException { - - HCatWriter writer = DataTransferFactory.getHCatWriter(context); - writer.write(new HCatRecordItr()); - } - - private void commit(Map<String, String> config, boolean status, WriterContext context) throws IOException { - - WriteEntity.Builder builder = new WriteEntity.Builder(); - WriteEntity entity = builder.withTable("mytbl").build(); - HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); - if(status){ - writer.commit(context); - } else { - writer.abort(context); - } - } - - private static HCatRecord getRecord(int i) { - List<Object> list = new ArrayList<Object>(2); - list.add("Row #: " + i); - list.add(i); - return new DefaultHCatRecord(list); - } - - private static class HCatRecordItr implements Iterator<HCatRecord> { - - int i = 0; - @Override - public boolean hasNext() { - return i++ < 100 ? true : false; - } - - @Override - public HCatRecord next() { - return getRecord(i); - } - - @Override - public void remove() { - throw new RuntimeException(); - } - } + @Test + public void test() throws MetaException, CommandNeedRetryException, + IOException, ClassNotFoundException { + + HiveConf conf = new HiveConf(getClass()); + Driver driver = new Driver(conf); + SessionState.start(new CliSessionState(conf)); + driver.run("drop table mytbl"); + driver.run("create table mytbl (a string, b int)"); + Iterator<Entry<String, String>> itr = conf.iterator(); + Map<String, String> map = new HashMap<String, String>(); + while (itr.hasNext()) { + Entry<String, String> kv = itr.next(); + map.put(kv.getKey(), kv.getValue()); + } + + WriterContext cntxt = runsInMaster(map); + + File writeCntxtFile = File.createTempFile("hcat-write", "temp"); + writeCntxtFile.deleteOnExit(); + + // Serialize context. + ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream( + writeCntxtFile)); + oos.writeObject(cntxt); + oos.flush(); + oos.close(); + + // Now, deserialize it. + ObjectInputStream ois = new ObjectInputStream(new FileInputStream( + writeCntxtFile)); + cntxt = (WriterContext) ois.readObject(); + ois.close(); + + runsInSlave(cntxt); + commit(map, true, cntxt); + + ReaderContext readCntxt = runsInMaster(map, false); + + File readCntxtFile = File.createTempFile("hcat-read", "temp"); + readCntxtFile.deleteOnExit(); + oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile)); + oos.writeObject(readCntxt); + oos.flush(); + oos.close(); + + ois = new ObjectInputStream(new FileInputStream(readCntxtFile)); + readCntxt = (ReaderContext) ois.readObject(); + ois.close(); + + for (InputSplit split : readCntxt.getSplits()) { + runsInSlave(split, readCntxt.getConf()); + } + } + + private WriterContext runsInMaster(Map<String, String> config) + throws HCatException { + + WriteEntity.Builder builder = new WriteEntity.Builder(); + WriteEntity entity = builder.withTable("mytbl").build(); + HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + WriterContext info = writer.prepareWrite(); + return info; + } + + private ReaderContext runsInMaster(Map<String, String> config, boolean bogus) + throws HCatException { + + ReadEntity.Builder builder = new ReadEntity.Builder(); + ReadEntity entity = builder.withTable("mytbl").build(); + HCatReader reader = DataTransferFactory.getHCatReader(entity, config); + ReaderContext cntxt = reader.prepareRead(); + return cntxt; + } + + private void runsInSlave(InputSplit split, Configuration config) + throws HCatException { + + HCatReader reader = DataTransferFactory.getHCatReader(split, config); + Iterator<HCatRecord> itr = reader.read(); + int i = 1; + while (itr.hasNext()) { + HCatRecord read = itr.next(); + HCatRecord written = getRecord(i++); + // Argh, HCatRecord doesnt implement equals() + Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0), + written.get(0).equals(read.get(0))); + Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1), + written.get(1).equals(read.get(1))); + Assert.assertEquals(2, read.size()); + } + Assert.assertFalse(itr.hasNext()); + } + + private void runsInSlave(WriterContext context) throws HCatException { + + HCatWriter writer = DataTransferFactory.getHCatWriter(context); + writer.write(new HCatRecordItr()); + } + + private void commit(Map<String, String> config, boolean status, + WriterContext context) throws IOException { + + WriteEntity.Builder builder = new WriteEntity.Builder(); + WriteEntity entity = builder.withTable("mytbl").build(); + HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + if (status) { + writer.commit(context); + } else { + writer.abort(context); + } + } + + private static HCatRecord getRecord(int i) { + List<Object> list = new ArrayList<Object>(2); + list.add("Row #: " + i); + list.add(i); + return new DefaultHCatRecord(list); + } + + private static class HCatRecordItr implements Iterator<HCatRecord> { + + int i = 0; + + @Override + public boolean hasNext() { + return i++ < 100 ? true : false; + } + + @Override + public HCatRecord next() { + return getRecord(i); + } + + @Override + public void remove() { + throw new RuntimeException(); + } + } }
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1352738&r1=1352737&r2=1352738&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java Fri Jun 22 01:40:27 2012 @@ -41,73 +41,82 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.common.HCatConstants; -public class TestMsgBusConnection extends TestCase{ +public class TestMsgBusConnection extends TestCase { - private Driver driver; - private BrokerService broker; - private MessageConsumer consumer; - - @Override - protected void setUp() throws Exception { - - super.setUp(); - broker = new BrokerService(); - // configure the broker - broker.addConnector("tcp://localhost:61616?broker.persistent=false"); - - broker.start(); - - System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); - connectClient(); - HiveConf hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - } - - private void connectClient() throws JMSException{ - ConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616"); - Connection conn = connFac.createConnection(); - conn.start(); - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic("planetlab.hcat"); - consumer = session.createConsumer(hcatTopic); - } - - public void testConnection() throws Exception{ - - try{ - driver.run("create database testconndb"); - Message msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - broker.stop(); - driver.run("drop database testconndb cascade"); - broker.start(true); - connectClient(); - driver.run("create database testconndb"); - msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - driver.run("drop database testconndb cascade"); - msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - } catch (NoSuchObjectException nsoe){ - nsoe.printStackTrace(System.err); - assert false; - } catch (AlreadyExistsException aee){ - aee.printStackTrace(System.err); - assert false; - } - } + private Driver driver; + private BrokerService broker; + private MessageConsumer consumer; + + @Override + protected void setUp() throws Exception { + + super.setUp(); + broker = new BrokerService(); + // configure the broker + broker.addConnector("tcp://localhost:61616?broker.persistent=false"); + + broker.start(); + + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); + connectClient(); + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + private void connectClient() throws JMSException { + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "tcp://localhost:61616"); + Connection conn = connFac.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session.createTopic("planetlab.hcat"); + consumer = session.createConsumer(hcatTopic); + } + + public void testConnection() throws Exception { + + try { + driver.run("create database testconndb"); + Message msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + broker.stop(); + driver.run("drop database testconndb cascade"); + broker.start(true); + connectClient(); + driver.run("create database testconndb"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + driver.run("drop database testconndb cascade"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } catch (NoSuchObjectException nsoe) { + nsoe.printStackTrace(System.err); + assert false; + } catch (AlreadyExistsException aee) { + aee.printStackTrace(System.err); + assert false; + } + } } Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1352738&r1=1352737&r2=1352738&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java Fri Jun 22 01:40:27 2012 @@ -58,126 +58,137 @@ import org.apache.thrift.TException; import junit.framework.TestCase; -public class TestNotificationListener extends TestCase implements MessageListener{ +public class TestNotificationListener extends TestCase implements + MessageListener { - private HiveConf hiveConf; - private Driver driver; - private AtomicInteger cntInvocation = new AtomicInteger(0); - - @Override - protected void setUp() throws Exception { - - super.setUp(); - System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); - ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - Connection conn = connFac.createConnection(); - conn.start(); - // We want message to be sent when session commits, thus we run in - // transacted mode. - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - MessageConsumer consumer1 = session.createConsumer(hcatTopic); - consumer1.setMessageListener(this); - Destination tblTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl"); - MessageConsumer consumer2 = session.createConsumer(tblTopic); - consumer2.setMessageListener(this); - Destination dbTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb"); - MessageConsumer consumer3 = session.createConsumer(dbTopic); - consumer3.setMessageListener(this); - hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - } - - @Override - protected void tearDown() throws Exception { - assertEquals(7, cntInvocation.get()); - super.tearDown(); - } - - public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, - CommandNeedRetryException, UnknownDBException, InvalidPartitionException, UnknownPartitionException{ - driver.run("create database mydb"); - driver.run("use mydb"); - driver.run("create table mytbl (a string) partitioned by (b string)"); - driver.run("alter table mytbl add partition(b='2011')"); - HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); - Map<String,String> kvs = new HashMap<String, String>(1); - kvs.put("b", "2011"); - msc.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE); - driver.run("alter table mytbl drop partition(b='2011')"); - driver.run("drop table mytbl"); - driver.run("drop database mydb"); - } - - @Override - public void onMessage(Message msg) { - cntInvocation.incrementAndGet(); - - String event; - try { - event = msg.getStringProperty(HCatConstants.HCAT_EVENT); - if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){ - - assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString()); - assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - } - else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){ - - assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString()); - Table tbl = (Table)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); - } - else if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){ - - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - Partition part = (Partition)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); - List<String> vals = new ArrayList<String>(1); - vals.add("2011"); - assertEquals(vals,part.getValues()); - } - else if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){ - - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - Partition part = (Partition)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); - List<String> vals = new ArrayList<String>(1); - vals.add("2011"); - assertEquals(vals,part.getValues()); - } - else if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){ - - assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString()); - Table tbl = (Table)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); - } - else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){ - - assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString()); - assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - } - else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - MapMessage mapMsg = (MapMessage)msg; - assert mapMsg.getString("b").equals("2011"); - } else - assert false; - } catch (JMSException e) { - e.printStackTrace(System.err); - assert false; - } - } + private HiveConf hiveConf; + private Driver driver; + private AtomicInteger cntInvocation = new AtomicInteger(0); + + @Override + protected void setUp() throws Exception { + + super.setUp(); + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", + "vm://localhost?broker.persistent=false"); + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + Connection conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); + MessageConsumer consumer1 = session.createConsumer(hcatTopic); + consumer1.setMessageListener(this); + Destination tblTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl"); + MessageConsumer consumer2 = session.createConsumer(tblTopic); + consumer2.setMessageListener(this); + Destination dbTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb"); + MessageConsumer consumer3 = session.createConsumer(dbTopic); + consumer3.setMessageListener(this); + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } + + @Override + protected void tearDown() throws Exception { + assertEquals(7, cntInvocation.get()); + super.tearDown(); + } + + public void testAMQListener() throws MetaException, TException, + UnknownTableException, NoSuchObjectException, CommandNeedRetryException, + UnknownDBException, InvalidPartitionException, UnknownPartitionException { + driver.run("create database mydb"); + driver.run("use mydb"); + driver.run("create table mytbl (a string) partitioned by (b string)"); + driver.run("alter table mytbl add partition(b='2011')"); + HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); + Map<String, String> kvs = new HashMap<String, String>(1); + kvs.put("b", "2011"); + msc.markPartitionForEvent("mydb", "mytbl", kvs, + PartitionEventType.LOAD_DONE); + driver.run("alter table mytbl drop partition(b='2011')"); + driver.run("drop table mytbl"); + driver.run("drop database mydb"); + } + + @Override + public void onMessage(Message msg) { + cntInvocation.incrementAndGet(); + + String event; + try { + event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { + + assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + assertEquals("mydb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { + + assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Table tbl = (Table) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { + + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + Partition part = (Partition) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List<String> vals = new ArrayList<String>(1); + vals.add("2011"); + assertEquals(vals, part.getValues()); + } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { + + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + Partition part = (Partition) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List<String> vals = new ArrayList<String>(1); + vals.add("2011"); + assertEquals(vals, part.getValues()); + } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { + + assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Table tbl = (Table) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { + + assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + assertEquals("mydb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + MapMessage mapMsg = (MapMessage) msg; + assert mapMsg.getString("b").equals("2011"); + } else + assert false; + } catch (JMSException e) { + e.printStackTrace(System.err); + assert false; + } + } } Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java?rev=1352738&r1=1352737&r2=1352738&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java Fri Jun 22 01:40:27 2012 @@ -110,18 +110,18 @@ public class TestHCatLoaderComplexSchema String pigSchema = "(" + "a: " + - "(" + - "aa: chararray, " + - "ab: long, " + - "ac: map[], " + - "ad: { t: (ada: long) }, " + - "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," + - "af: (afa: chararray, afb: long) " + - ")," + - "b: chararray, " + - "c: long, " + - "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " + - ")"; + "(" + + "aa: chararray, " + + "ab: long, " + + "ac: map[], " + + "ad: { t: (ada: long) }, " + + "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," + + "af: (afa: chararray, afb: long) " + + ")," + + "b: chararray, " + + "c: long, " + + "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " + + ")"; // with extra structs String tableSchema = Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1352738&r1=1352737&r2=1352738&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Fri Jun 22 01:40:27 2012 @@ -132,7 +132,7 @@ public class TestHCatStorer extends Test driver.run("drop table employee"); String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " + - " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE"; + " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE"; int retCode = driver.run(createTable).getResponseCode(); if(retCode != 0) { @@ -148,7 +148,7 @@ public class TestHCatStorer extends Test PigServer pig = new PigServer(ExecType.LOCAL); pig.setBatchOn(); pig.registerQuery("A = LOAD '"+INPUT_FILE_NAME+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," + - "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); + "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); pig.registerQuery("TN = FILTER A BY emp_state == 'TN';"); pig.registerQuery("KA = FILTER A BY emp_state == 'KA';"); pig.registerQuery("KL = FILTER A BY emp_state == 'KL';"); @@ -415,7 +415,7 @@ public class TestHCatStorer extends Test public void testBagNStruct() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); String createTable = "create table junit_unparted(b string,a struct<a1:int>, arr_of_struct array<string>, " + - "arr_of_struct2 array<struct<s1:string,s2:string>>, arr_of_struct3 array<struct<s3:string>>) stored as RCFILE"; + "arr_of_struct2 array<struct<s1:string,s2:string>>, arr_of_struct3 array<struct<s3:string>>) stored as RCFILE"; int retCode = driver.run(createTable).getResponseCode(); if(retCode != 0) { throw new IOException("Failed to create table."); @@ -430,7 +430,7 @@ public class TestHCatStorer extends Test server.setBatchOn(); server.registerQuery("A = load '"+INPUT_FILE_NAME+"' as (b:chararray, a:tuple(a1:int), arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)});"); server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','b:chararray, a:tuple(a1:int)," + - " arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');"); + " arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');"); server.executeBatch(); driver.run("select * from junit_unparted");
