Repository: hive Updated Branches: refs/heads/master 8a06b9e6e -> 441b29e10
HIVE-15754: exchange partition is not generating notifications (Nachiket Vaidya, reviewed by Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/441b29e1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/441b29e1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/441b29e1 Branch: refs/heads/master Commit: 441b29e10365276af552310c7ef4d56393521086 Parents: 8a06b9e Author: Nachiket Vaidya <[email protected]> Authored: Wed Feb 8 10:35:22 2017 -0600 Committer: Sergio Pena <[email protected]> Committed: Wed Feb 8 10:35:22 2017 -0600 ---------------------------------------------------------------------- .../listener/TestDbNotificationListener.java | 82 ++++++++++++++++++++ .../hadoop/hive/metastore/HiveMetaStore.java | 38 ++++++++- 2 files changed, 119 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/441b29e1/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 640b567..1cf47c3 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -32,6 +32,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -574,6 +576,86 @@ public class TestDbNotificationListener { } @Test + public void exchangePartition() throws Exception { + String dbName = "default"; + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(new FieldSchema("col1", "int", "nocomment")); + List<FieldSchema> partCols = new ArrayList<FieldSchema>(); + partCols.add(new FieldSchema("part", "int", "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd1 = new StorageDescriptor(cols, "file:/tmp/1", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table tab1 = new Table("tab1", dbName, "me", startTime, startTime, 0, sd1, partCols, + emptyParameters, null, null, null); + msClient.createTable(tab1); + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); // add_table + + StorageDescriptor sd2 = new StorageDescriptor(cols, "file:/tmp/2", "input", "output", false, 0, + serde, null, null, emptyParameters); + Table tab2 = new Table("tab2", dbName, "me", startTime, startTime, 0, sd2, partCols, + emptyParameters, null, null, null); // add_table + msClient.createTable(tab2); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(1, rsp.getEventsSize()); + + StorageDescriptor sd1part = new StorageDescriptor(cols, "file:/tmp/1/part=1", "input", "output", false, 0, + serde, null, null, emptyParameters); + StorageDescriptor sd2part = new StorageDescriptor(cols, "file:/tmp/1/part=2", "input", "output", false, 0, + serde, null, null, emptyParameters); + StorageDescriptor sd3part = new StorageDescriptor(cols, "file:/tmp/1/part=3", "input", "output", false, 0, + serde, null, null, emptyParameters); + Partition part1 = new Partition(Arrays.asList("1"), "default", tab1.getTableName(), + startTime, startTime, sd1part, emptyParameters); + Partition part2 = new Partition(Arrays.asList("2"), "default", tab1.getTableName(), + startTime, startTime, sd2part, emptyParameters); + Partition part3 = new Partition(Arrays.asList("3"), "default", tab1.getTableName(), + startTime, startTime, sd3part, emptyParameters); + msClient.add_partitions(Arrays.asList(part1, part2, part3)); + rsp = msClient.getNextNotification(firstEventId + 2, 0, null); + assertEquals(1, rsp.getEventsSize()); // add_partition + + msClient.exchange_partition(ImmutableMap.of("part", "1"), + dbName, tab1.getTableName(), dbName, tab2.getTableName()); + + rsp = msClient.getNextNotification(firstEventId + 3, 0, null); + assertEquals(2, rsp.getEventsSize()); + + NotificationEvent event = rsp.getEvents().get(0); + assertEquals(firstEventId + 4, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType()); + assertEquals(dbName, event.getDbName()); + assertEquals(tab2.getTableName(), event.getTableName()); + + // Parse the message field + AddPartitionMessage addPtnMsg = md.getAddPartitionMessage(event.getMessage()); + assertEquals(dbName, addPtnMsg.getDB()); + assertEquals(tab2.getTableName(), addPtnMsg.getTable()); + Iterator<Partition> ptnIter = addPtnMsg.getPartitionObjs().iterator(); + assertTrue(ptnIter.hasNext()); + Partition msgPart = ptnIter.next(); + assertEquals(part1.getValues(), msgPart.getValues()); + assertEquals(dbName, msgPart.getDbName()); + assertEquals(tab2.getTableName(), msgPart.getTableName()); + + event = rsp.getEvents().get(1); + assertEquals(firstEventId + 5, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.DROP_PARTITION.toString(), event.getEventType()); + assertEquals(dbName, event.getDbName()); + assertEquals(tab1.getTableName(), event.getTableName()); + + // Parse the message field + DropPartitionMessage dropPtnMsg = md.getDropPartitionMessage(event.getMessage()); + assertEquals(dbName, dropPtnMsg.getDB()); + assertEquals(tab1.getTableName(), dropPtnMsg.getTable()); + Iterator<Map<String, String>> parts = dropPtnMsg.getPartitions().iterator(); + assertTrue(parts.hasNext()); + assertEquals(part1.getValues(), Lists.newArrayList(parts.next().values())); + } + + @Test public void createFunction() throws Exception { String defaultDbName = "default"; String funcName = "createfunction"; http://git-wip-us.apache.org/repos/asf/hive/blob/441b29e1/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 12485a9..07eca38 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -58,6 +58,7 @@ import java.util.regex.Pattern; import javax.jdo.JDOException; import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -2937,8 +2938,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { Warehouse.makePartName(partitionKeysPresent, partValsPresent)); Path destPath = new Path(destinationTable.getSd().getLocation(), Warehouse.makePartName(partitionKeysPresent, partValsPresent)); + List<Partition> destPartitions = new ArrayList<Partition>(); try { - List<Partition> destPartitions = new ArrayList<Partition>(); for (Partition partition: partitionsToExchange) { Partition destPartition = new Partition(partition); destPartition.setDbName(destDbName); @@ -2962,6 +2963,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { * once https://issues.apache.org/jira/browse/HDFS-3370 is done */ pathCreated = wh.renameDir(sourcePath, destPath); + + // Setting success to false to make sure that if the listener fails, rollback happens. + success = false; + fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange, + destinationTable, destPartitions, transactionalListeners, true); + success = ms.commitTransaction(); return destPartitions; } finally { @@ -2970,6 +2977,35 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (pathCreated) { wh.renameDir(destPath, sourcePath); } + + fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange, + destinationTable, destPartitions, listeners, success); + } + } + } + + private void fireMetaStoreExchangePartitionEvent(Table sourceTable, + List<Partition> partitionsToExchange, Table destinationTable, + List<Partition> destPartitions, + List<MetaStoreEventListener> eventListeners, + boolean status) throws MetaException { + if (sourceTable != null && destinationTable != null + && !CollectionUtils.isEmpty(partitionsToExchange) + && !CollectionUtils.isEmpty(destPartitions)) { + if (eventListeners.size() > 0) { + AddPartitionEvent addPartitionEvent = + new AddPartitionEvent(destinationTable, destPartitions, status, this); + for (MetaStoreEventListener eventListener : eventListeners) { + eventListener.onAddPartition(addPartitionEvent); + } + + for (Partition partition : partitionsToExchange) { + DropPartitionEvent dropPartitionEvent = + new DropPartitionEvent(sourceTable, partition, true, status, this); + for (MetaStoreEventListener eventListener : eventListeners) { + eventListener.onDropPartition(dropPartitionEvent); + } + } } } }
