Modified: incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1431177&r1=1431176&r2=1431177&view=diff ============================================================================== --- incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java (original) +++ incubator/hcatalog/trunk/server-extensions/src/test/java/org/apache/hcatalog/listener/TestNotificationListener.java Thu Jan 10 02:06:07 2013 @@ -29,26 +29,32 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MapMessage; +import javax.jms.TextMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; -import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.mapreduce.HCatBaseTest; +import org.apache.hcatalog.messaging.AddPartitionMessage; +import org.apache.hcatalog.messaging.CreateDatabaseMessage; +import org.apache.hcatalog.messaging.CreateTableMessage; +import org.apache.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hcatalog.messaging.DropPartitionMessage; +import org.apache.hcatalog.messaging.DropTableMessage; +import org.apache.hcatalog.messaging.HCatEventMessage; +import org.apache.hcatalog.messaging.MessageDeserializer; +import org.apache.hcatalog.messaging.MessageFactory; +import org.apache.hcatalog.messaging.jms.MessagingUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -95,10 +101,9 @@ public class TestNotificationListener ex @After public void tearDown() throws Exception { List<String> expectedMessages = Arrays.asList( - HCatConstants.HCAT_ADD_DATABASE_EVENT, - HCatConstants.HCAT_ADD_TABLE_EVENT, + HCatConstants.HCAT_CREATE_DATABASE_EVENT, + HCatConstants.HCAT_CREATE_TABLE_EVENT, HCatConstants.HCAT_ADD_PARTITION_EVENT, - HCatConstants.HCAT_PARTITION_DONE_EVENT, HCatConstants.HCAT_DROP_PARTITION_EVENT, HCatConstants.HCAT_DROP_TABLE_EVENT, HCatConstants.HCAT_DROP_DATABASE_EVENT); @@ -125,61 +130,87 @@ public class TestNotificationListener ex String event; try { event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + String format = msg.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT); + String version = msg.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION); + String messageBody = ((TextMessage)msg).getText(); actualMessages.add(event); + MessageDeserializer deserializer = MessageFactory.getDeserializer(format, version); - if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { + if (event.equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT)) { Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg .getJMSDestination().toString()); - Assert.assertEquals("mydb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); - } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { + CreateDatabaseMessage message = deserializer.getCreateDatabaseMessage(messageBody); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof CreateDatabaseMessage); + Assert.assertEquals("mydb", message2.getDB()); + } else if (event.equals(HCatConstants.HCAT_CREATE_TABLE_EVENT)) { Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); - Table tbl = (Table) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", tbl.getTableName()); - Assert.assertEquals("mydb", tbl.getDbName()); - Assert.assertEquals(1, tbl.getPartitionKeysSize()); + CreateTableMessage message = deserializer.getCreateTableMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof CreateTableMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((CreateTableMessage) message2).getTable()); } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() .toString()); - Partition part = (Partition) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", part.getTableName()); - Assert.assertEquals("mydb", part.getDbName()); - List<String> vals = new ArrayList<String>(1); - vals.add("2011"); - Assert.assertEquals(vals, part.getValues()); + AddPartitionMessage message = deserializer.getAddPartitionMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + Assert.assertEquals(1, message.getPartitions().size()); + Assert.assertEquals("2011", message.getPartitions().get(0).get("b")); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof AddPartitionMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((AddPartitionMessage) message2).getTable()); + Assert.assertEquals(1, ((AddPartitionMessage) message2).getPartitions().size()); + Assert.assertEquals("2011", ((AddPartitionMessage) message2).getPartitions().get(0).get("b")); } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() .toString()); - Partition part = (Partition) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", part.getTableName()); - Assert.assertEquals("mydb", part.getDbName()); - List<String> vals = new ArrayList<String>(1); - vals.add("2011"); - Assert.assertEquals(vals, part.getValues()); + DropPartitionMessage message = deserializer.getDropPartitionMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + Assert.assertEquals(1, message.getPartitions().size()); + Assert.assertEquals("2011", message.getPartitions().get(0).get("b")); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropPartitionMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((DropPartitionMessage) message2).getTable()); + Assert.assertEquals(1, ((DropPartitionMessage) message2).getPartitions().size()); + Assert.assertEquals("2011", ((DropPartitionMessage) message2).getPartitions().get(0).get("b")); } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); - Table tbl = (Table) (((ObjectMessage) msg).getObject()); - Assert.assertEquals("mytbl", tbl.getTableName()); - Assert.assertEquals("mydb", tbl.getDbName()); - Assert.assertEquals(1, tbl.getPartitionKeysSize()); + DropTableMessage message = deserializer.getDropTableMessage(messageBody); + Assert.assertEquals("mytbl", message.getTable()); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropTableMessage); + Assert.assertEquals("mydb", message2.getDB()); + Assert.assertEquals("mytbl", ((DropTableMessage) message2).getTable()); } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg .getJMSDestination().toString()); - Assert.assertEquals("mydb", - ((Database) ((ObjectMessage) msg).getObject()).getName()); + DropDatabaseMessage message = deserializer.getDropDatabaseMessage(messageBody); + Assert.assertEquals("mydb", message.getDB()); + HCatEventMessage message2 = MessagingUtils.getMessage(msg); + Assert.assertTrue("Unexpected message-type.", message2 instanceof DropDatabaseMessage); + Assert.assertEquals("mydb", message2.getDB()); } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { - Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() - .toString()); - MapMessage mapMsg = (MapMessage) msg; - assert mapMsg.getString("b").equals("2011"); - } else - assert false; + // TODO: Fill in when PARTITION_DONE_EVENT is supported. + Assert.assertTrue("Unexpected: HCAT_PARTITION_DONE_EVENT not supported (yet).", false); + } else { + Assert.assertTrue("Unexpected event-type: " + event, false); + } + } catch (JMSException e) { e.printStackTrace(System.err); assert false;
