Author: travis
Date: Wed Sep 12 14:34:01 2012
New Revision: 1383969
URL: http://svn.apache.org/viewvc?rev=1383969&view=rev
Log:
HCATALOG-498 TestNotificationListener failing in trunk
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1383969&r1=1383968&r2=1383969&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Sep 12 14:34:01 2012
@@ -107,6 +107,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-498 TestNotificationListener failing in trunk (traviscrawford)
+
HCAT-397 Removal of unused parameter hadoop.clientside.fs.operations
(khorgath via traviscrawford)
HCAT-483 HCatBaseStorer.getHCatFSFromPigFS fails when Table schema is not
provided (pengfeng via traviscrawford)
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1383969&r1=1383968&r2=1383969&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
Wed Sep 12 14:34:01 2012
@@ -124,16 +124,19 @@ public class NotificationListener extend
}
+ /**
+ * Send dropped partition notifications. Subscribers can receive these
notifications for a
+ * particular table by listening on a topic named "dbName.tableName" with
message selector
+ * string {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
+ * {@value
org.apache.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT}.
+ * </br>
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for
persistence, has been
+ * found to throw NPE when serializing objects that contain null. For this
reason we override
+ * some fields in the StorageDescriptor of this notification. This should
be fixed after
+ * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+ */
@Override
- public void onDropPartition(DropPartitionEvent partitionEvent)
- throws MetaException {
- // Subscriber can get notification of dropped partition in a
- // particular table by listening on a topic named "dbName.tableName"
- // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
-
- // Datanucleus throws NPE when we try to serialize a partition object
- // retrieved from metastore. To workaround that we reset following
objects
-
+ public void onDropPartition(DropPartitionEvent partitionEvent) throws
MetaException {
if (partitionEvent.getStatus()) {
Partition partition = partitionEvent.getPartition();
StorageDescriptor sd = partition.getSd();
@@ -141,6 +144,7 @@ public class NotificationListener extend
sd.setSortCols(new ArrayList<Order>());
sd.setParameters(new HashMap<String, String>());
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSkewedInfo().setSkewedColNames(new ArrayList<String>());
String topicName = getTopicName(partition, partitionEvent);
if (topicName != null && !topicName.equals("")) {
send(partition, topicName,
HCatConstants.HCAT_DROP_PARTITION_EVENT);
@@ -215,6 +219,17 @@ public class NotificationListener extend
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
}
+ /**
+ * Send dropped table notifications. Subscribers can receive these
notifications for
+ * dropped tables by listening on topic "HCAT" with message selector string
+ * {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
+ * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT}
+ * </br>
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for
persistence, has been
+ * found to throw NPE when serializing objects that contain null. For this
reason we override
+ * some fields in the StorageDescriptor of this notification. This should
be fixed after
+ * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+ */
@Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
// Subscriber can get notification about drop of a table in HCAT
@@ -231,6 +246,7 @@ public class NotificationListener extend
sd.setSortCols(new ArrayList<Order>());
sd.setParameters(new HashMap<String, String>());
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSkewedInfo().setSkewedColNames(new ArrayList<String>());
send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf())
+ "."
+ table.getDbName().toLowerCase(),
HCatConstants.HCAT_DROP_TABLE_EVENT);
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1383969&r1=1383968&r2=1383969&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
Wed Sep 12 14:34:01 2012
@@ -19,10 +19,10 @@
package org.apache.hcatalog.listener;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -37,38 +37,28 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
-import org.apache.hadoop.hive.metastore.api.UnknownTableException;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hcatalog.common.HCatConstants;
-import org.apache.thrift.TException;
+import org.apache.hcatalog.mapreduce.HCatBaseTest;
-import junit.framework.TestCase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
-public class TestNotificationListener extends TestCase implements
- MessageListener {
+public class TestNotificationListener extends HCatBaseTest implements
MessageListener {
- private HiveConf hiveConf;
- private Driver driver;
- private AtomicInteger cntInvocation = new AtomicInteger(0);
+ private List<String> actualMessages = new ArrayList<String>();
- @Override
- protected void setUp() throws Exception {
-
- super.setUp();
+ @Before
+ public void setUp() throws Exception {
System.setProperty("java.naming.factory.initial",
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
System.setProperty("java.naming.provider.url",
@@ -92,34 +82,37 @@ public class TestNotificationListener ex
.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX +
".mydb");
MessageConsumer consumer3 = session.createConsumer(dbTopic);
consumer3.setMessageListener(this);
- hiveConf = new HiveConf(this.getClass());
+
+ setUpHiveConf();
hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
NotificationListener.class.getName());
- hiveConf.set("hive.metastore.local", "true");
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
"false");
SessionState.start(new CliSessionState(hiveConf));
driver = new Driver(hiveConf);
+ client = new HiveMetaStoreClient(hiveConf);
}
- @Override
- protected void tearDown() throws Exception {
- assertEquals(7, cntInvocation.get());
- super.tearDown();
+ @After
+ public void tearDown() throws Exception {
+ List<String> expectedMessages = Arrays.asList(
+ HCatConstants.HCAT_ADD_DATABASE_EVENT,
+ HCatConstants.HCAT_ADD_TABLE_EVENT,
+ HCatConstants.HCAT_ADD_PARTITION_EVENT,
+ HCatConstants.HCAT_PARTITION_DONE_EVENT,
+ HCatConstants.HCAT_DROP_PARTITION_EVENT,
+ HCatConstants.HCAT_DROP_TABLE_EVENT,
+ HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ Assert.assertEquals(expectedMessages, actualMessages);
}
- public void testAMQListener() throws MetaException, TException,
- UnknownTableException, NoSuchObjectException,
CommandNeedRetryException,
- UnknownDBException, InvalidPartitionException,
UnknownPartitionException {
+ @Test
+ public void testAMQListener() throws Exception {
driver.run("create database mydb");
driver.run("use mydb");
driver.run("create table mytbl (a string) partitioned by (b string)");
driver.run("alter table mytbl add partition(b='2011')");
- HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
Map<String, String> kvs = new HashMap<String, String>(1);
kvs.put("b", "2011");
- msc.markPartitionForEvent("mydb", "mytbl", kvs,
+ client.markPartitionForEvent("mydb", "mytbl", kvs,
PartitionEventType.LOAD_DONE);
driver.run("alter table mytbl drop partition(b='2011')");
driver.run("drop table mytbl");
@@ -128,59 +121,59 @@ public class TestNotificationListener ex
@Override
public void onMessage(Message msg) {
- cntInvocation.incrementAndGet();
-
String event;
try {
event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+ actualMessages.add(event);
+
if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) {
- assertEquals("topic://" +
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+ Assert.assertEquals("topic://" +
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
.getJMSDestination().toString());
- assertEquals("mydb",
+ Assert.assertEquals("mydb",
((Database) ((ObjectMessage)
msg).getObject()).getName());
} else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) {
- assertEquals("topic://hcat.mydb",
msg.getJMSDestination().toString());
+ Assert.assertEquals("topic://hcat.mydb",
msg.getJMSDestination().toString());
Table tbl = (Table) (((ObjectMessage) msg).getObject());
- assertEquals("mytbl", tbl.getTableName());
- assertEquals("mydb", tbl.getDbName());
- assertEquals(1, tbl.getPartitionKeysSize());
+ Assert.assertEquals("mytbl", tbl.getTableName());
+ Assert.assertEquals("mydb", tbl.getDbName());
+ Assert.assertEquals(1, tbl.getPartitionKeysSize());
} else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
- assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ Assert.assertEquals("topic://hcat.mydb.mytbl",
msg.getJMSDestination()
.toString());
Partition part = (Partition) (((ObjectMessage)
msg).getObject());
- assertEquals("mytbl", part.getTableName());
- assertEquals("mydb", part.getDbName());
+ Assert.assertEquals("mytbl", part.getTableName());
+ Assert.assertEquals("mydb", part.getDbName());
List<String> vals = new ArrayList<String>(1);
vals.add("2011");
- assertEquals(vals, part.getValues());
+ Assert.assertEquals(vals, part.getValues());
} else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
- assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ Assert.assertEquals("topic://hcat.mydb.mytbl",
msg.getJMSDestination()
.toString());
Partition part = (Partition) (((ObjectMessage)
msg).getObject());
- assertEquals("mytbl", part.getTableName());
- assertEquals("mydb", part.getDbName());
+ Assert.assertEquals("mytbl", part.getTableName());
+ Assert.assertEquals("mydb", part.getDbName());
List<String> vals = new ArrayList<String>(1);
vals.add("2011");
- assertEquals(vals, part.getValues());
+ Assert.assertEquals(vals, part.getValues());
} else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
- assertEquals("topic://hcat.mydb",
msg.getJMSDestination().toString());
+ Assert.assertEquals("topic://hcat.mydb",
msg.getJMSDestination().toString());
Table tbl = (Table) (((ObjectMessage) msg).getObject());
- assertEquals("mytbl", tbl.getTableName());
- assertEquals("mydb", tbl.getDbName());
- assertEquals(1, tbl.getPartitionKeysSize());
+ Assert.assertEquals("mytbl", tbl.getTableName());
+ Assert.assertEquals("mydb", tbl.getDbName());
+ Assert.assertEquals(1, tbl.getPartitionKeysSize());
} else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
- assertEquals("topic://" +
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+ Assert.assertEquals("topic://" +
HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
.getJMSDestination().toString());
- assertEquals("mydb",
+ Assert.assertEquals("mydb",
((Database) ((ObjectMessage)
msg).getObject()).getName());
} else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
- assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ Assert.assertEquals("topic://hcat.mydb.mytbl",
msg.getJMSDestination()
.toString());
MapMessage mapMsg = (MapMessage) msg;
assert mapMsg.getString("b").equals("2011");
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java?rev=1383969&r1=1383968&r2=1383969&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
Wed Sep 12 14:34:01 2012
@@ -60,17 +60,24 @@ public class HCatBaseTest {
@Before
public void setUp() throws Exception {
if (driver == null) {
- hiveConf = new HiveConf(this.getClass());
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
"false");
- hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
TEST_WAREHOUSE_DIR);
+ setUpHiveConf();
driver = new Driver(hiveConf);
client = new HiveMetaStoreClient(hiveConf);
SessionState.start(new CliSessionState(hiveConf));
}
}
+ /**
+ * Create a new HiveConf and set properties necessary for unit tests.
+ */
+ protected void setUpHiveConf() {
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
"false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
TEST_WAREHOUSE_DIR);
+ }
+
protected void logAndRegister(PigServer server, String query) throws
IOException {
LOG.info("Registering pig query: " + query);
server.registerQuery(query);