Author: gates
Date: Mon Mar 23 22:22:06 2015
New Revision: 1668753

URL: http://svn.apache.org/r1668753
Log:
HIVE-9977 Compactor not running on partitions after dynamic partitioned insert 
(Alan Gates reviewed by Eugene Koifman)

Added:
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AddDynamicPartitions.java
Modified:
    
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    hive/trunk/metastore/if/hive_metastore.thrift
    hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.cpp
    hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.h
    
hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore_server.skeleton.cpp
    hive/trunk/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
    hive/trunk/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FireEventRequest.java
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventResponse.java
    
hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java
    
hive/trunk/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
    hive/trunk/metastore/src/gen/thrift/gen-php/metastore/Types.php
    
hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
    
hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
    hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
    hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
    hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
    
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java

Modified: 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1668753&r1=1668752&r2=1668753&view=diff
==============================================================================
--- 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 (original)
+++ 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 Mon Mar 23 22:22:06 2015
@@ -10,7 +10,6 @@ import org.apache.hadoop.hive.common.Val
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -55,6 +54,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -88,6 +89,7 @@ public class TestCompactor {
     hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
     hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, 
HiveInputFormat.class.getName());
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
     //"org.apache.hadoop.hive.ql.io.HiveInputFormat"
 
     TxnDbUtil.setConfValues(hiveConf);
@@ -281,6 +283,124 @@ public class TestCompactor {
   }
 
   @Test
+  public void dynamicPartitioningInsert() throws Exception {
+    String tblName = "dpct";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " PARTITIONED BY(ds string)" +
+      " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to 
be bucketed
+      " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) 
values (1, 'fred', " +
+        "'today'), (2, 'wilma', 'yesterday')", driver);
+
+    Initiator initiator = new Initiator();
+    initiator.setThreadId((int)initiator.getId());
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0);
+    initiator.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean();
+    stop.set(true);
+    initiator.init(stop, new AtomicBoolean());
+    initiator.run();
+
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(2, compacts.size());
+    SortedSet<String> partNames = new TreeSet<String>();
+    for (int i = 0; i < compacts.size(); i++) {
+      Assert.assertEquals("default", compacts.get(i).getDbname());
+      Assert.assertEquals(tblName, compacts.get(i).getTablename());
+      Assert.assertEquals("initiated", compacts.get(i).getState());
+      partNames.add(compacts.get(i).getPartitionname());
+    }
+    List<String> names = new ArrayList<String>(partNames);
+    Assert.assertEquals("ds=today", names.get(0));
+    Assert.assertEquals("ds=yesterday", names.get(1));
+  }
+
+  @Test
+  public void dynamicPartitioningUpdate() throws Exception {
+    String tblName = "udpct";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " PARTITIONED BY(ds string)" +
+      " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to 
be bucketed
+      " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) 
values (1, 'fred', " +
+        "'today'), (2, 'wilma', 'yesterday')", driver);
+
+    executeStatementOnDriver("update " + tblName + " set b = 'barney'", 
driver);
+
+    Initiator initiator = new Initiator();
+    initiator.setThreadId((int)initiator.getId());
+    // Set to 1 so insert doesn't set it off but update does
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 1);
+    initiator.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean();
+    stop.set(true);
+    initiator.init(stop, new AtomicBoolean());
+    initiator.run();
+
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(2, compacts.size());
+    SortedSet<String> partNames = new TreeSet<String>();
+    for (int i = 0; i < compacts.size(); i++) {
+      Assert.assertEquals("default", compacts.get(i).getDbname());
+      Assert.assertEquals(tblName, compacts.get(i).getTablename());
+      Assert.assertEquals("initiated", compacts.get(i).getState());
+      partNames.add(compacts.get(i).getPartitionname());
+    }
+    List<String> names = new ArrayList<String>(partNames);
+    Assert.assertEquals("ds=today", names.get(0));
+    Assert.assertEquals("ds=yesterday", names.get(1));
+  }
+
+  @Test
+  public void dynamicPartitioningDelete() throws Exception {
+    String tblName = "ddpct";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " PARTITIONED BY(ds string)" +
+      " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to 
be bucketed
+      " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) 
values (1, 'fred', " +
+        "'today'), (2, 'wilma', 'yesterday')", driver);
+
+    executeStatementOnDriver("update " + tblName + " set a = 3", driver);
+
+    executeStatementOnDriver("delete from " + tblName + " where b = 'fred'", 
driver);
+
+    Initiator initiator = new Initiator();
+    initiator.setThreadId((int)initiator.getId());
+    // Set to 2 so insert and update don't set it off but delete does
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 2);
+    initiator.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean();
+    stop.set(true);
+    initiator.init(stop, new AtomicBoolean());
+    initiator.run();
+
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    SortedSet<String> partNames = new TreeSet<String>();
+    for (int i = 0; i < compacts.size(); i++) {
+      Assert.assertEquals("default", compacts.get(i).getDbname());
+      Assert.assertEquals(tblName, compacts.get(i).getTablename());
+      Assert.assertEquals("initiated", compacts.get(i).getState());
+      partNames.add(compacts.get(i).getPartitionname());
+    }
+    List<String> names = new ArrayList<String>(partNames);
+    Assert.assertEquals("ds=today", names.get(0));
+  }
+
+  @Test
   public void minorCompactWhileStreaming() throws Exception {
     String dbName = "default";
     String tblName = "cws";

Modified: hive/trunk/metastore/if/hive_metastore.thrift
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/if/hive_metastore.thrift?rev=1668753&r1=1668752&r2=1668753&view=diff
==============================================================================
--- hive/trunk/metastore/if/hive_metastore.thrift (original)
+++ hive/trunk/metastore/if/hive_metastore.thrift Mon Mar 23 22:22:06 2015
@@ -651,6 +651,13 @@ struct ShowCompactResponse {
     1: required list<ShowCompactResponseElement> compacts,
 }
 
+struct AddDynamicPartitions {
+    1: required i64 txnid,
+    2: required string dbname,
+    3: required string tablename,
+    4: required list<string> partitionnames,
+}
+
 struct NotificationEventRequest {
     1: required i64 lastEvent,
     2: optional i32 maxEvents,
@@ -1164,6 +1171,7 @@ service ThriftHiveMetastore extends fb30
   HeartbeatTxnRangeResponse heartbeat_txn_range(1:HeartbeatTxnRangeRequest 
txns)
   void compact(1:CompactionRequest rqst) 
   ShowCompactResponse show_compact(1:ShowCompactRequest rqst)
+  void add_dynamic_partitions(1:AddDynamicPartitions rqst) throws 
(1:NoSuchTxnException o1, 2:TxnAbortedException o2)
 
   // Notification logging calls
   NotificationEventResponse get_next_notification(1:NotificationEventRequest 
rqst) 


Reply via email to