Repository: hive
Updated Branches:
  refs/heads/master 8a06b9e6e -> 441b29e10


HIVE-15754: exchange partition is not generating notifications (Nachiket 
Vaidya, reviewed by Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/441b29e1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/441b29e1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/441b29e1

Branch: refs/heads/master
Commit: 441b29e10365276af552310c7ef4d56393521086
Parents: 8a06b9e
Author: Nachiket Vaidya <[email protected]>
Authored: Wed Feb 8 10:35:22 2017 -0600
Committer: Sergio Pena <[email protected]>
Committed: Wed Feb 8 10:35:22 2017 -0600

----------------------------------------------------------------------
 .../listener/TestDbNotificationListener.java    | 82 ++++++++++++++++++++
 .../hadoop/hive/metastore/HiveMetaStore.java    | 38 ++++++++-
 2 files changed, 119 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/441b29e1/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 640b567..1cf47c3 100644
--- 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -32,6 +32,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -574,6 +576,86 @@ public class TestDbNotificationListener {
   }
 
   @Test
+  public void exchangePartition() throws Exception {
+    String dbName = "default";
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(new FieldSchema("col1", "int", "nocomment"));
+    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+    partCols.add(new FieldSchema("part", "int", ""));
+    SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+    StorageDescriptor sd1 = new StorageDescriptor(cols, "file:/tmp/1", 
"input", "output", false, 0,
+        serde, null, null, emptyParameters);
+    Table tab1 = new Table("tab1", dbName, "me", startTime, startTime, 0, sd1, 
partCols,
+        emptyParameters, null, null, null);
+    msClient.createTable(tab1);
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
+    assertEquals(1, rsp.getEventsSize()); // add_table
+
+    StorageDescriptor sd2 = new StorageDescriptor(cols, "file:/tmp/2", 
"input", "output", false, 0,
+        serde, null, null, emptyParameters);
+    Table tab2 = new Table("tab2", dbName, "me", startTime, startTime, 0, sd2, 
partCols,
+        emptyParameters, null, null, null); // add_table
+    msClient.createTable(tab2);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    StorageDescriptor sd1part = new StorageDescriptor(cols, 
"file:/tmp/1/part=1", "input", "output", false, 0,
+        serde, null, null, emptyParameters);
+    StorageDescriptor sd2part = new StorageDescriptor(cols, 
"file:/tmp/1/part=2", "input", "output", false, 0,
+        serde, null, null, emptyParameters);
+    StorageDescriptor sd3part = new StorageDescriptor(cols, 
"file:/tmp/1/part=3", "input", "output", false, 0,
+        serde, null, null, emptyParameters);
+    Partition part1 = new Partition(Arrays.asList("1"), "default", 
tab1.getTableName(),
+        startTime, startTime, sd1part, emptyParameters);
+    Partition part2 = new Partition(Arrays.asList("2"), "default", 
tab1.getTableName(),
+        startTime, startTime, sd2part, emptyParameters);
+    Partition part3 = new Partition(Arrays.asList("3"), "default", 
tab1.getTableName(),
+        startTime, startTime, sd3part, emptyParameters);
+    msClient.add_partitions(Arrays.asList(part1, part2, part3));
+    rsp = msClient.getNextNotification(firstEventId + 2, 0, null);
+    assertEquals(1, rsp.getEventsSize()); // add_partition
+
+    msClient.exchange_partition(ImmutableMap.of("part", "1"),
+        dbName, tab1.getTableName(), dbName, tab2.getTableName());
+
+    rsp = msClient.getNextNotification(firstEventId + 3, 0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 4, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+    assertEquals(dbName, event.getDbName());
+    assertEquals(tab2.getTableName(), event.getTableName());
+
+    // Parse the message field
+    AddPartitionMessage addPtnMsg = 
md.getAddPartitionMessage(event.getMessage());
+    assertEquals(dbName, addPtnMsg.getDB());
+    assertEquals(tab2.getTableName(), addPtnMsg.getTable());
+    Iterator<Partition> ptnIter = addPtnMsg.getPartitionObjs().iterator();
+    assertTrue(ptnIter.hasNext());
+    Partition msgPart = ptnIter.next();
+    assertEquals(part1.getValues(), msgPart.getValues());
+    assertEquals(dbName, msgPart.getDbName());
+    assertEquals(tab2.getTableName(), msgPart.getTableName());
+
+    event = rsp.getEvents().get(1);
+    assertEquals(firstEventId + 5, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.DROP_PARTITION.toString(), event.getEventType());
+    assertEquals(dbName, event.getDbName());
+    assertEquals(tab1.getTableName(), event.getTableName());
+
+    // Parse the message field
+    DropPartitionMessage dropPtnMsg = 
md.getDropPartitionMessage(event.getMessage());
+    assertEquals(dbName, dropPtnMsg.getDB());
+    assertEquals(tab1.getTableName(), dropPtnMsg.getTable());
+    Iterator<Map<String, String>> parts = 
dropPtnMsg.getPartitions().iterator();
+    assertTrue(parts.hasNext());
+    assertEquals(part1.getValues(), Lists.newArrayList(parts.next().values()));
+  }
+
+  @Test
   public void createFunction() throws Exception {
     String defaultDbName = "default";
     String funcName = "createfunction";

http://git-wip-us.apache.org/repos/asf/hive/blob/441b29e1/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 12485a9..07eca38 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -58,6 +58,7 @@ import java.util.regex.Pattern;
 import javax.jdo.JDOException;
 
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -2937,8 +2938,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           Warehouse.makePartName(partitionKeysPresent, partValsPresent));
       Path destPath = new Path(destinationTable.getSd().getLocation(),
           Warehouse.makePartName(partitionKeysPresent, partValsPresent));
+      List<Partition> destPartitions = new ArrayList<Partition>();
       try {
-        List<Partition> destPartitions = new ArrayList<Partition>();
         for (Partition partition: partitionsToExchange) {
           Partition destPartition = new Partition(partition);
           destPartition.setDbName(destDbName);
@@ -2962,6 +2963,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
          * once https://issues.apache.org/jira/browse/HDFS-3370 is done
          */
         pathCreated = wh.renameDir(sourcePath, destPath);
+
+        // Setting success to false to make sure that if the listener fails, 
rollback happens.
+        success = false;
+        fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
+            destinationTable, destPartitions, transactionalListeners, true);
+
         success = ms.commitTransaction();
         return destPartitions;
       } finally {
@@ -2970,6 +2977,35 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (pathCreated) {
             wh.renameDir(destPath, sourcePath);
           }
+
+          fireMetaStoreExchangePartitionEvent(sourceTable, 
partitionsToExchange,
+              destinationTable, destPartitions, listeners, success);
+        }
+      }
+    }
+
+    private void fireMetaStoreExchangePartitionEvent(Table sourceTable,
+        List<Partition> partitionsToExchange, Table destinationTable,
+        List<Partition> destPartitions,
+        List<MetaStoreEventListener> eventListeners,
+        boolean status) throws MetaException {
+      if (sourceTable != null && destinationTable != null
+          && !CollectionUtils.isEmpty(partitionsToExchange)
+          && !CollectionUtils.isEmpty(destPartitions)) {
+        if (eventListeners.size() > 0) {
+          AddPartitionEvent addPartitionEvent =
+              new AddPartitionEvent(destinationTable, destPartitions, status, 
this);
+          for (MetaStoreEventListener eventListener : eventListeners) {
+            eventListener.onAddPartition(addPartitionEvent);
+          }
+
+          for (Partition partition : partitionsToExchange) {
+            DropPartitionEvent dropPartitionEvent =
+                new DropPartitionEvent(sourceTable, partition, true, status, 
this);
+            for (MetaStoreEventListener eventListener : eventListeners) {
+              eventListener.onDropPartition(dropPartitionEvent);
+            }
+          }
         }
       }
     }

Reply via email to