Author: gates Date: Mon Mar 23 22:22:06 2015 New Revision: 1668753 URL: http://svn.apache.org/r1668753 Log: HIVE-9977 Compactor not running on partitions after dynamic partitioned insert (Alan Gates reviewed by Eugene Koifman)
Added: hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AddDynamicPartitions.java Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java hive/trunk/metastore/if/hive_metastore.thrift hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.cpp hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.h hive/trunk/metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore_server.skeleton.cpp hive/trunk/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp hive/trunk/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FireEventRequest.java hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventResponse.java hive/trunk/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java hive/trunk/metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php hive/trunk/metastore/src/gen/thrift/gen-php/metastore/Types.php hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py hive/trunk/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1668753&r1=1668752&r2=1668753&view=diff ============================================================================== --- hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java (original) +++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java Mon Mar 23 22:22:06 2015 @@ -10,7 +10,6 @@ import org.apache.hadoop.hive.common.Val import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -55,6 +54,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -88,6 +89,7 @@ public class TestCompactor { hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); //"org.apache.hadoop.hive.ql.io.HiveInputFormat" TxnDbUtil.setConfValues(hiveConf); @@ -281,6 +283,124 @@ public class TestCompactor { } @Test + public void dynamicPartitioningInsert() throws Exception { + String tblName = "dpct"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds string)" + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " + + "'today'), (2, 'wilma', 'yesterday')", driver); + + Initiator initiator = new Initiator(); + initiator.setThreadId((int)initiator.getId()); + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); + initiator.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + initiator.init(stop, new AtomicBoolean()); + initiator.run(); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(2, compacts.size()); + SortedSet<String> partNames = new TreeSet<String>(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals("default", compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + List<String> names = new ArrayList<String>(partNames); + Assert.assertEquals("ds=today", names.get(0)); + Assert.assertEquals("ds=yesterday", names.get(1)); + } + + @Test + public void dynamicPartitioningUpdate() throws Exception { + String tblName = "udpct"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds string)" + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " + + "'today'), (2, 'wilma', 'yesterday')", driver); + + executeStatementOnDriver("update " + tblName + " set b = 'barney'", driver); + + Initiator initiator = new Initiator(); + initiator.setThreadId((int)initiator.getId()); + // Set to 1 so insert doesn't set it off but update does + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 1); + initiator.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + initiator.init(stop, new AtomicBoolean()); + initiator.run(); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(2, compacts.size()); + SortedSet<String> partNames = new TreeSet<String>(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals("default", compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + List<String> names = new ArrayList<String>(partNames); + Assert.assertEquals("ds=today", names.get(0)); + Assert.assertEquals("ds=yesterday", names.get(1)); + } + + @Test + public void dynamicPartitioningDelete() throws Exception { + String tblName = "ddpct"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds string)" + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " + + "'today'), (2, 'wilma', 'yesterday')", driver); + + executeStatementOnDriver("update " + tblName + " set a = 3", driver); + + executeStatementOnDriver("delete from " + tblName + " where b = 'fred'", driver); + + Initiator initiator = new Initiator(); + initiator.setThreadId((int)initiator.getId()); + // Set to 2 so insert and update don't set it off but delete does + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 2); + initiator.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + initiator.init(stop, new AtomicBoolean()); + initiator.run(); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + SortedSet<String> partNames = new TreeSet<String>(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals("default", compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + List<String> names = new ArrayList<String>(partNames); + Assert.assertEquals("ds=today", names.get(0)); + } + + @Test public void minorCompactWhileStreaming() throws Exception { String dbName = "default"; String tblName = "cws"; Modified: hive/trunk/metastore/if/hive_metastore.thrift URL: http://svn.apache.org/viewvc/hive/trunk/metastore/if/hive_metastore.thrift?rev=1668753&r1=1668752&r2=1668753&view=diff ============================================================================== --- hive/trunk/metastore/if/hive_metastore.thrift (original) +++ hive/trunk/metastore/if/hive_metastore.thrift Mon Mar 23 22:22:06 2015 @@ -651,6 +651,13 @@ struct ShowCompactResponse { 1: required list<ShowCompactResponseElement> compacts, } +struct AddDynamicPartitions { + 1: required i64 txnid, + 2: required string dbname, + 3: required string tablename, + 4: required list<string> partitionnames, +} + struct NotificationEventRequest { 1: required i64 lastEvent, 2: optional i32 maxEvents, @@ -1164,6 +1171,7 @@ service ThriftHiveMetastore extends fb30 HeartbeatTxnRangeResponse heartbeat_txn_range(1:HeartbeatTxnRangeRequest txns) void compact(1:CompactionRequest rqst) ShowCompactResponse show_compact(1:ShowCompactRequest rqst) + void add_dynamic_partitions(1:AddDynamicPartitions rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2) // Notification logging calls NotificationEventResponse get_next_notification(1:NotificationEventRequest rqst)