HIVE-20707: Automatic partition management (Prasanth Jayachandran reviewed by 
Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/64bea035
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/64bea035
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/64bea035

Branch: refs/heads/master
Commit: 64bea0354fba2947e4bc0318728f5419e5d763b9
Parents: 54bba9c
Author: Prasanth Jayachandran <[email protected]>
Authored: Mon Oct 29 15:07:49 2018 -0700
Committer: Prasanth Jayachandran <[email protected]>
Committed: Mon Oct 29 15:07:49 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  12 +
 .../results/positive/external_table_ppd.q.out   |   1 +
 .../positive/hbase_binary_storage_queries.q.out |   2 +
 .../src/test/results/positive/hbase_ddl.q.out   |   2 +
 .../test/results/positive/hbase_queries.q.out   |   1 +
 .../src/test/results/positive/hbasestats.q.out  |   5 +
 .../hive/ql/txn/compactor/TestCompactor.java    |   1 -
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 290 +--------
 .../apache/hadoop/hive/ql/exec/ExplainTask.java |  40 +-
 .../hadoop/hive/ql/metadata/CheckResult.java    | 142 -----
 .../hive/ql/metadata/HiveMetaStoreChecker.java  | 567 ------------------
 .../hive/ql/optimizer/GenMapRedUtils.java       |  21 +-
 .../hive/ql/parse/DDLSemanticAnalyzer.java      |   6 +-
 .../hadoop/hive/ql/plan/CreateTableDesc.java    |   6 +
 .../exec/TestMsckCreatePartitionsInBatches.java | 244 +++++---
 .../exec/TestMsckDropPartitionsInBatches.java   | 125 ++--
 .../ql/metadata/TestHiveMetaStoreChecker.java   | 187 +++---
 .../queries/clientpositive/msck_repair_acid.q   |  34 ++
 .../clientpositive/partition_discovery.q        |  77 +++
 .../results/clientpositive/create_like.q.out    |   1 +
 .../clientpositive/create_like_view.q.out       |   1 +
 .../clientpositive/default_file_format.q.out    |   4 +
 .../druid/druidkafkamini_basic.q.out            |   2 +
 .../druid/druidmini_expressions.q.out           |   2 +
 .../results/clientpositive/druid_topn.q.out     |   1 +
 .../results/clientpositive/explain_locks.q.out  |   1 +
 .../llap/external_table_purge.q.out             |   4 +
 .../results/clientpositive/llap/mm_exim.q.out   |   1 +
 .../llap/strict_managed_tables2.q.out           |   2 +
 .../llap/table_nonprintable.q.out               |   2 +-
 .../clientpositive/llap/whroot_external1.q.out  |   6 +
 .../clientpositive/msck_repair_acid.q.out       |  88 +++
 .../clientpositive/msck_repair_drop.q.out       |  68 +--
 .../clientpositive/partition_discovery.q.out    | 357 ++++++++++++
 .../rename_external_partition_location.q.out    |   2 +
 .../clientpositive/repl_2_exim_basic.q.out      |   2 +
 .../show_create_table_alter.q.out               |   5 +
 .../show_create_table_partitioned.q.out         |   1 +
 .../show_create_table_serde.q.out               |   1 +
 .../clientpositive/spark/stats_noscan_2.q.out   |   2 +
 .../results/clientpositive/stats_noscan_2.q.out |   2 +
 .../temp_table_display_colstats_tbllvl.q.out    |   5 +
 .../hadoop/hive/metastore/CheckResult.java      | 153 +++++
 .../apache/hadoop/hive/metastore/Warehouse.java |   2 +-
 .../hive/metastore/api/MetastoreException.java  |  36 ++
 .../hive/metastore/conf/MetastoreConf.java      |  59 +-
 .../hive/metastore/utils/MetaStoreUtils.java    |  55 +-
 .../hive/metastore/HiveMetaStoreChecker.java    | 571 ++++++++++++++++++
 .../org/apache/hadoop/hive/metastore/Msck.java  | 530 +++++++++++++++++
 .../apache/hadoop/hive/metastore/MsckInfo.java  | 125 ++++
 .../metastore/MsckPartitionExpressionProxy.java |  64 ++
 .../hadoop/hive/metastore/ObjectStore.java      |   6 +-
 .../hive/metastore/PartitionIterable.java       | 163 ++++++
 .../hive/metastore/PartitionManagementTask.java | 235 ++++++++
 .../metastore/utils/MetaStoreServerUtils.java   | 167 +++++-
 .../hive/metastore/utils/RetryUtilities.java    | 110 ++++
 .../hive/metastore/NonCatCallsWithCatalog.java  |   4 +-
 .../hive/metastore/TestCatalogOldClient.java    |   4 +-
 .../hive/metastore/TestPartitionManagement.java | 581 +++++++++++++++++++
 .../hive/metastore/client/TestGetTableMeta.java |  11 +-
 60 files changed, 3891 insertions(+), 1308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e226a1f..917aaeb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4415,17 +4415,29 @@ public class HiveConf extends Configuration {
       "Merge adjacent joins into a single n-way join"),
     HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, 
null),
       "If value is greater than 0 logs in fixed intervals of size n rather 
than exponentially."),
+    /**
+     * @deprecated Use MetastoreConf.MSCK_PATH_VALIDATION
+     */
+    @Deprecated
     HIVE_MSCK_PATH_VALIDATION("hive.msck.path.validation", "throw",
         new StringSet("throw", "skip", "ignore"), "The approach msck should 
take with HDFS " +
        "directories that are partition-like but contain unsupported 
characters. 'throw' (an " +
        "exception) is the default; 'skip' will skip the invalid directories 
and still repair the" +
        " others; 'ignore' will skip the validation (legacy behavior, causes 
bugs in many cases)"),
+    /**
+     * @deprecated Use MetastoreConf.MSCK_REPAIR_BATCH_SIZE
+     */
+    @Deprecated
     HIVE_MSCK_REPAIR_BATCH_SIZE(
         "hive.msck.repair.batch.size", 3000,
         "Batch size for the msck repair command. If the value is greater than 
zero,\n "
             + "it will execute batch wise with the configured batch size. In 
case of errors while\n"
             + "adding unknown partitions the batch size is automatically 
reduced by half in the subsequent\n"
             + "retry attempt. The default value is 3000 which means it will 
execute in the batches of 3000."),
+    /**
+     * @deprecated Use MetastoreConf.MSCK_REPAIR_BATCH_MAX_RETRIES
+     */
+    @Deprecated
     HIVE_MSCK_REPAIR_BATCH_MAX_RETRIES("hive.msck.repair.batch.max.retries", 4,
         "Maximum number of retries for the msck repair command when adding 
unknown partitions.\n "
         + "If the value is greater than zero it will retry adding unknown 
partitions until the maximum\n"

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/external_table_ppd.q.out
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/results/positive/external_table_ppd.q.out 
b/hbase-handler/src/test/results/positive/external_table_ppd.q.out
index edcbe7e..22c8b70 100644
--- a/hbase-handler/src/test/results/positive/external_table_ppd.q.out
+++ b/hbase-handler/src/test/results/positive/external_table_ppd.q.out
@@ -60,6 +60,7 @@ Table Parameters:
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}}
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
        hbase.table.default.storage.type        binary              
        hbase.table.name        t_hive              

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
----------------------------------------------------------------------
diff --git 
a/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out 
b/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
index 1209c88..bf1a89d 100644
--- a/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
@@ -60,6 +60,7 @@ Table Parameters:
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}}
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
        hbase.table.default.storage.type        binary              
        hbase.table.name        t_hive              
@@ -242,6 +243,7 @@ Table Parameters:
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}}
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        hbase.table.name        t_hive              
        numFiles                0                   
        numRows                 0                   

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/hbase_ddl.q.out
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/results/positive/hbase_ddl.q.out 
b/hbase-handler/src/test/results/positive/hbase_ddl.q.out
index ccd4148..fc40026 100644
--- a/hbase-handler/src/test/results/positive/hbase_ddl.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_ddl.q.out
@@ -119,6 +119,7 @@ Table Type:                 EXTERNAL_TABLE
 Table Parameters:               
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
        hbase.mapred.output.outputtable kkk                 
        hbase.table.name        hbase_table_0       
@@ -168,6 +169,7 @@ Table Type:                 EXTERNAL_TABLE
 Table Parameters:               
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
        hbase.table.name        hbase_table_0       
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/hbase_queries.q.out
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/results/positive/hbase_queries.q.out 
b/hbase-handler/src/test/results/positive/hbase_queries.q.out
index eeb97f0..aea7e7e 100644
--- a/hbase-handler/src/test/results/positive/hbase_queries.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_queries.q.out
@@ -986,6 +986,7 @@ WITH SERDEPROPERTIES (
   'hbase.columns.mapping'='cf:string', 
   'serialization.format'='1')
 TBLPROPERTIES (
+  'discover.partitions'='true', 
   'hbase.table.name'='hbase_table_0', 
 #### A masked pattern was here ####
 PREHOOK: query: DROP TABLE IF EXISTS hbase_table_9

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/hbasestats.q.out
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/results/positive/hbasestats.q.out 
b/hbase-handler/src/test/results/positive/hbasestats.q.out
index 5a4aea9..5143522 100644
--- a/hbase-handler/src/test/results/positive/hbasestats.q.out
+++ b/hbase-handler/src/test/results/positive/hbasestats.q.out
@@ -42,6 +42,7 @@ Table Parameters:
        COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"country\":\"true\",\"country_id\":\"true\",\"key\":\"true\",\"state\":\"true\"}}
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
        numFiles                0                   
        numRows                 0                   
@@ -136,6 +137,7 @@ Table Type:                 EXTERNAL_TABLE
 Table Parameters:               
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
 #### A masked pattern was here ####
        numFiles                0                   
@@ -203,6 +205,7 @@ Table Type:                 EXTERNAL_TABLE
 Table Parameters:               
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
 #### A masked pattern was here ####
        numFiles                0                   
@@ -262,6 +265,7 @@ Table Parameters:
        COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
 #### A masked pattern was here ####
        numFiles                0                   
@@ -371,6 +375,7 @@ Table Type:                 EXTERNAL_TABLE
 Table Parameters:               
        EXTERNAL                TRUE                
        bucketing_version       2                   
+       discover.partitions     true                
        external.table.purge    true                
 #### A masked pattern was here ####
        numFiles                0                   

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index a9d7468..9648645 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1679,7 +1679,6 @@ public class TestCompactor {
     Assert.assertNotEquals("Unexpected default compression size", 2700,
       OrcConf.BUFFER_SIZE.getDefaultValue());
 
-
     // Insert one more row - this should trigger 
hive.compactor.delta.pct.threshold to be reached for ttp2
     executeStatementOnDriver("insert into " + tblName1 + " values (6, 'f')", 
driver);
     executeStatementOnDriver("insert into " + tblName2 + " values (6, 'f')", 
driver);

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 807f159..6790a06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -74,7 +74,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Msck;
+import org.apache.hadoop.hive.metastore.MsckInfo;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
+import org.apache.hadoop.hive.metastore.PartitionManagementTask;
 import org.apache.hadoop.hive.metastore.StatObjectConverter;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -149,13 +152,13 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.metadata.CheckConstraint;
-import org.apache.hadoop.hive.ql.metadata.CheckResult;
+import org.apache.hadoop.hive.metastore.CheckResult;
 import org.apache.hadoop.hive.ql.metadata.DefaultConstraint;
 import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
-import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreChecker;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreChecker;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -2125,279 +2128,22 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
    * @return Returns 0 when execution succeeds and above 0 if it fails.
    */
   private int msck(Hive db, MsckDesc msckDesc) {
-    CheckResult result = new CheckResult();
-    List<String> repairOutput = new ArrayList<String>();
+    Msck msck;
     try {
-      HiveMetaStoreChecker checker = new HiveMetaStoreChecker(db);
+      msck = new Msck( false, false);
+      msck.init(db.getConf());
       String[] names = Utilities.getDbTableName(msckDesc.getTableName());
-
-      // checkMetastore call will fill in result with partitions that are 
present in filesystem
-      // and missing in metastore - accessed through getPartitionsNotInMs
-      // And partitions that are not present in filesystem and metadata exists 
in metastore -
-      // accessed through getPartitionNotOnFS
-      checker.checkMetastore(names[0], names[1], msckDesc.getPartSpecs(), 
result);
-      Set<CheckResult.PartitionResult> partsNotInMs = 
result.getPartitionsNotInMs();
-      Set<CheckResult.PartitionResult> partsNotInFs = 
result.getPartitionsNotOnFs();
-
-      if (msckDesc.isRepairPartitions()) {
-        // Repair metadata in HMS
-
-        Table table = db.getTable(msckDesc.getTableName());
-        int maxRetries = 
conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_MAX_RETRIES);
-        int decayingFactor = 2;
-
-        if (msckDesc.isAddPartitions() && !partsNotInMs.isEmpty()) {
-          // MSCK called to add missing paritions into metastore and there are
-          // missing partitions.
-
-          int batchSize = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE);
-          if (batchSize == 0) {
-            //batching is not enabled. Try to add all the partitions in one 
call
-            batchSize = partsNotInMs.size();
-          }
-
-          AbstractList<String> vals = null;
-          String settingStr = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION);
-          boolean doValidate = !("ignore".equals(settingStr));
-          boolean doSkip = doValidate && "skip".equals(settingStr);
-          // The default setting is "throw"; assume doValidate && !doSkip 
means throw.
-          if (doValidate) {
-            // Validate that we can add partition without escaping. Escaping 
was originally intended
-            // to avoid creating invalid HDFS paths; however, if we escape the 
HDFS path (that we
-            // deem invalid but HDFS actually supports - it is possible to 
create HDFS paths with
-            // unprintable characters like ASCII 7), metastore will create 
another directory instead
-            // of the one we are trying to "repair" here.
-            Iterator<CheckResult.PartitionResult> iter = 
partsNotInMs.iterator();
-            while (iter.hasNext()) {
-              CheckResult.PartitionResult part = iter.next();
-              try {
-                vals = Warehouse.makeValsFromName(part.getPartitionName(), 
vals);
-              } catch (MetaException ex) {
-                throw new HiveException(ex);
-              }
-              for (String val : vals) {
-                String escapedPath = FileUtils.escapePathName(val);
-                assert escapedPath != null;
-                if (escapedPath.equals(val)) {
-                  continue;
-                }
-                String errorMsg = "Repair: Cannot add partition " + 
msckDesc.getTableName() + ':' +
-                    part.getPartitionName() + " due to invalid characters in 
the name";
-                if (doSkip) {
-                  repairOutput.add(errorMsg);
-                  iter.remove();
-                } else {
-                  throw new HiveException(errorMsg);
-                }
-              }
-            }
-          }
-          try {
-            createPartitionsInBatches(db, repairOutput, partsNotInMs, table, 
batchSize,
-                decayingFactor, maxRetries);
-          } catch (Exception e) {
-            throw new HiveException(e);
-          }
-        }
-
-        if (msckDesc.isDropPartitions() && !partsNotInFs.isEmpty()) {
-          // MSCK called to drop stale paritions from metastore and there are
-          // stale partitions.
-
-          int batchSize = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE);
-          if (batchSize == 0) {
-            //batching is not enabled. Try to drop all the partitions in one 
call
-            batchSize = partsNotInFs.size();
-          }
-
-          try {
-            dropPartitionsInBatches(db, repairOutput, partsNotInFs, table, 
batchSize,
-                decayingFactor, maxRetries);
-          } catch (Exception e) {
-            throw new HiveException(e);
-          }
-        }
-      }
-    } catch (HiveException e) {
-      LOG.warn("Failed to run metacheck: ", e);
+      MsckInfo msckInfo = new MsckInfo(SessionState.get().getCurrentCatalog(), 
names[0],
+        names[1], msckDesc.getPartSpecs(), msckDesc.getResFile(),
+        msckDesc.isRepairPartitions(), msckDesc.isAddPartitions(), 
msckDesc.isDropPartitions(), -1);
+      return msck.repair(msckInfo);
+    } catch (MetaException e) {
+      LOG.error("Unable to create msck instance.", e);
       return 1;
-    } catch (IOException e) {
-      LOG.warn("Failed to run metacheck: ", e);
+    } catch (SemanticException e) {
+      LOG.error("Msck failed.", e);
       return 1;
-    } finally {
-      BufferedWriter resultOut = null;
-      try {
-        Path resFile = new Path(msckDesc.getResFile());
-        FileSystem fs = resFile.getFileSystem(conf);
-        resultOut = new BufferedWriter(new OutputStreamWriter(fs
-            .create(resFile)));
-
-        boolean firstWritten = false;
-        firstWritten |= writeMsckResult(result.getTablesNotInMs(),
-            "Tables not in metastore:", resultOut, firstWritten);
-        firstWritten |= writeMsckResult(result.getTablesNotOnFs(),
-            "Tables missing on filesystem:", resultOut, firstWritten);
-        firstWritten |= writeMsckResult(result.getPartitionsNotInMs(),
-            "Partitions not in metastore:", resultOut, firstWritten);
-        firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(),
-            "Partitions missing from filesystem:", resultOut, firstWritten);
-        for (String rout : repairOutput) {
-          if (firstWritten) {
-            resultOut.write(terminator);
-          } else {
-            firstWritten = true;
-          }
-          resultOut.write(rout);
-        }
-      } catch (IOException e) {
-        LOG.warn("Failed to save metacheck output: ", e);
-        return 1;
-      } finally {
-        if (resultOut != null) {
-          try {
-            resultOut.close();
-          } catch (IOException e) {
-            LOG.warn("Failed to close output file: ", e);
-            return 1;
-          }
-        }
-      }
-    }
-
-    return 0;
-  }
-
-  @VisibleForTesting
-  void createPartitionsInBatches(Hive db, List<String> repairOutput,
-      Set<CheckResult.PartitionResult> partsNotInMs, Table table, int 
batchSize, int decayingFactor, int maxRetries)
-      throws Exception {
-    String addMsgFormat = "Repair: Added partition to metastore "
-        + table.getTableName() + ":%s";
-    Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInMs);
-    new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, 
decayingFactor, maxRetries) {
-      @Override
-      public Void execute(int size) throws Exception {
-        while (!batchWork.isEmpty()) {
-          //get the current batch size
-          int currentBatchSize = size;
-          AddPartitionDesc apd =
-              new AddPartitionDesc(table.getDbName(), table.getTableName(), 
true);
-          //store the partitions temporarily until processed
-          List<CheckResult.PartitionResult> lastBatch = new 
ArrayList<>(currentBatchSize);
-          List<String> addMsgs = new ArrayList<>(currentBatchSize);
-          //add the number of partitions given by the current batchsize
-          for (CheckResult.PartitionResult part : batchWork) {
-            if (currentBatchSize == 0) {
-              break;
-            }
-            
apd.addPartition(Warehouse.makeSpecFromName(part.getPartitionName()), null);
-            lastBatch.add(part);
-            addMsgs.add(String.format(addMsgFormat, part.getPartitionName()));
-            currentBatchSize--;
-          }
-          db.createPartitions(apd);
-          // if last batch is successful remove it from partsNotInMs
-          batchWork.removeAll(lastBatch);
-          repairOutput.addAll(addMsgs);
-        }
-        return null;
-      }
-    }.run();
-  }
-
-  // Drops partitions in batches.  partNotInFs is split into batches based on 
batchSize
-  // and dropped.  The dropping will be through RetryUtilities which will 
retry when there is a
-  // failure after reducing the batchSize by decayingFactor.  Retrying will 
cease when maxRetries
-  // limit is reached or batchSize reduces to 0, whichever comes earlier.
-  @VisibleForTesting
-  void dropPartitionsInBatches(Hive db, List<String> repairOutput,
-      Set<CheckResult.PartitionResult> partsNotInFs, Table table, int 
batchSize, int decayingFactor,
-      int maxRetries) throws Exception {
-    String dropMsgFormat =
-        "Repair: Dropped partition from metastore " + 
table.getFullyQualifiedName() + ":%s";
-    // Copy of partitions that will be split into batches
-    Set<CheckResult.PartitionResult> batchWork = new TreeSet<>(partsNotInFs);
-
-    new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, 
decayingFactor, maxRetries) {
-      @Override
-      public Void execute(int size) throws Exception {
-        while (!batchWork.isEmpty()) {
-          int currentBatchSize = size;
-
-          // to store the partitions that are currently being processed
-          List<CheckResult.PartitionResult> lastBatch = new 
ArrayList<>(currentBatchSize);
-
-          // drop messages for the dropped partitions
-          List<String> dropMsgs = new ArrayList<>(currentBatchSize);
-
-          // Partitions to be dropped
-          List<String> dropParts = new ArrayList<>(currentBatchSize);
-
-          for (CheckResult.PartitionResult part : batchWork) {
-            // This batch is full: break out of for loop to execute
-            if (currentBatchSize == 0) {
-              break;
-            }
-
-            dropParts.add(part.getPartitionName());
-
-            // Add the part to lastBatch to track the parition being dropped
-            lastBatch.add(part);
-
-            // Update messages
-            dropMsgs.add(String.format(dropMsgFormat, 
part.getPartitionName()));
-
-            // Decrement batch size.  When this gets to 0, the batch will be 
executed
-            currentBatchSize--;
-          }
-
-          // this call is deleting partitions that are already missing from 
filesystem
-          // so 3rd parameter (deleteData) is set to false
-          // msck is doing a clean up of hms.  if for some reason the 
partition is already
-          // deleted, then it is good.  So, the last parameter ifexists is set 
to true
-          db.dropPartitions(table, dropParts, false, true);
-
-          // if last batch is successful remove it from partsNotInFs
-          batchWork.removeAll(lastBatch);
-          repairOutput.addAll(dropMsgs);
-        }
-        return null;
-      }
-    }.run();
-  }
-
-  /**
-   * Write the result of msck to a writer.
-   *
-   * @param result
-   *          The result we're going to write
-   * @param msg
-   *          Message to write.
-   * @param out
-   *          Writer to write to
-   * @param wrote
-   *          if any previous call wrote data
-   * @return true if something was written
-   * @throws IOException
-   *           In case the writing fails
-   */
-  private boolean writeMsckResult(Set<? extends Object> result, String msg,
-      Writer out, boolean wrote) throws IOException {
-
-    if (!result.isEmpty()) {
-      if (wrote) {
-        out.write(terminator);
-      }
-
-      out.write(msg);
-      for (Object entry : result) {
-        out.write(separator);
-        out.write(entry.toString());
-      }
-      return true;
     }
-
-    return false;
   }
 
   /**
@@ -5011,6 +4757,8 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       if (crtTbl.isExternal()) {
         tbl.setProperty("EXTERNAL", "TRUE");
         tbl.setTableType(TableType.EXTERNAL_TABLE);
+        // partition discovery is on by default
+        
tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, 
"true");
       }
 
       tbl.setFields(oldtbl.getCols());
@@ -5109,6 +4857,8 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       if (crtTbl.isExternal()) {
         tbl.setProperty("EXTERNAL", "TRUE");
         tbl.setTableType(TableType.EXTERNAL_TABLE);
+        // partition discovery is on by default
+        
tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, 
"true");
       } else {
         tbl.getParameters().remove("EXTERNAL");
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index 4cc5fa8..7c4efab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -356,32 +356,28 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
     if (jsonOutput) {
       out = null;
     }
-    if (work.getParseContext() != null) {
-      List<LockComponent> lockComponents = 
AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf);
-      if (null != out) {
-        out.print("LOCK INFORMATION:\n");
-      }
-      List<ExplainLockDesc> locks = new ArrayList<>(lockComponents.size());
-
-      for (LockComponent component : lockComponents) {
-        ExplainLockDesc lockDesc = new ExplainLockDesc(component);
+    List<LockComponent> lockComponents = 
AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf);
+    if (null != out) {
+      out.print("LOCK INFORMATION:\n");
+    }
+    List<ExplainLockDesc> locks = new ArrayList<>(lockComponents.size());
 
-        if (null != out) {
-          out.print(lockDesc.getFullName());
-          out.print(" -> ");
-          out.print(lockDesc.getLockType());
-          out.print('\n');
-        } else {
-          locks.add(lockDesc);
-        }
+    for (LockComponent component : lockComponents) {
+      ExplainLockDesc lockDesc = new ExplainLockDesc(component);
 
+      if (null != out) {
+        out.print(lockDesc.getFullName());
+        out.print(" -> ");
+        out.print(lockDesc.getLockType());
+        out.print('\n');
+      } else {
+        locks.add(lockDesc);
       }
 
-      if (jsonOutput) {
-        jsonObject.put("LOCK INFORMATION:", locks);
-      }
-    } else {
-      System.err.println("No parse context!");
+    }
+
+    if (jsonOutput) {
+      jsonObject.put("LOCK INFORMATION:", locks);
     }
     return jsonObject;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/metadata/CheckResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/CheckResult.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/CheckResult.java
deleted file mode 100644
index 0b4240f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/CheckResult.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.metadata;
-
-import java.util.Set;
-import java.util.TreeSet;
-
-/**
- * Result class used by the HiveMetaStoreChecker.
- */
-public class CheckResult {
-
-  private Set<String> tablesNotOnFs = new TreeSet<String>();
-  private Set<String> tablesNotInMs = new TreeSet<String>();
-  private Set<PartitionResult> partitionsNotOnFs = new 
TreeSet<PartitionResult>();
-  private Set<PartitionResult> partitionsNotInMs = new 
TreeSet<PartitionResult>();
-
-  /**
-   * @return a list of tables not found on the filesystem.
-   */
-  public Set<String> getTablesNotOnFs() {
-    return tablesNotOnFs;
-  }
-
-  /**
-   * @param tablesNotOnFs
-   *          a list of tables not found on the filesystem.
-   */
-  public void setTablesNotOnFs(Set<String> tablesNotOnFs) {
-    this.tablesNotOnFs = tablesNotOnFs;
-  }
-
-  /**
-   * @return a list of tables not found in the metastore.
-   */
-  public Set<String> getTablesNotInMs() {
-    return tablesNotInMs;
-  }
-
-  /**
-   * @param tablesNotInMs
-   *          a list of tables not found in the metastore.
-   */
-  public void setTablesNotInMs(Set<String> tablesNotInMs) {
-    this.tablesNotInMs = tablesNotInMs;
-  }
-
-  /**
-   * @return a list of partitions not found on the fs
-   */
-  public Set<PartitionResult> getPartitionsNotOnFs() {
-    return partitionsNotOnFs;
-  }
-
-  /**
-   * @param partitionsNotOnFs
-   *          a list of partitions not found on the fs
-   */
-  public void setPartitionsNotOnFs(Set<PartitionResult> partitionsNotOnFs) {
-    this.partitionsNotOnFs = partitionsNotOnFs;
-  }
-
-  /**
-   * @return a list of partitions not found in the metastore
-   */
-  public Set<PartitionResult> getPartitionsNotInMs() {
-    return partitionsNotInMs;
-  }
-
-  /**
-   * @param partitionsNotInMs
-   *          a list of partitions not found in the metastore
-   */
-  public void setPartitionsNotInMs(Set<PartitionResult> partitionsNotInMs) {
-    this.partitionsNotInMs = partitionsNotInMs;
-  }
-
-  /**
-   * A basic description of a partition that is missing from either the fs or
-   * the ms.
-   */
-  public static class PartitionResult implements Comparable<PartitionResult> {
-    private String partitionName;
-    private String tableName;
-
-    /**
-     * @return name of partition
-     */
-    public String getPartitionName() {
-      return partitionName;
-    }
-
-    /**
-     * @param partitionName
-     *          name of partition
-     */
-    public void setPartitionName(String partitionName) {
-      this.partitionName = partitionName;
-    }
-
-    /**
-     * @return table name
-     */
-    public String getTableName() {
-      return tableName;
-    }
-
-    /**
-     * @param tableName
-     *          table name
-     */
-    public void setTableName(String tableName) {
-      this.tableName = tableName;
-    }
-
-    @Override
-    public String toString() {
-      return tableName + ":" + partitionName;
-    }
-
-    public int compareTo(PartitionResult o) {
-      int ret = tableName.compareTo(o.tableName);
-      return ret != 0 ? ret : partitionName.compareTo(o.partitionName);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
deleted file mode 100644
index 598bb2e..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.metadata;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
-import org.apache.thrift.TException;
-
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Verify that the information in the metastore matches what is on the
- * filesystem. Return a CheckResult object containing lists of missing and any
- * unexpected tables and partitions.
- */
-public class HiveMetaStoreChecker {
-
-  public static final Logger LOG = 
LoggerFactory.getLogger(HiveMetaStoreChecker.class);
-  public static final String CLASS_NAME = HiveMetaStoreChecker.class.getName();
-
-  private final Hive hive;
-  private final HiveConf conf;
-
-  public HiveMetaStoreChecker(Hive hive) {
-    super();
-    this.hive = hive;
-    conf = hive.getConf();
-  }
-
-  /**
-   * Check the metastore for inconsistencies, data missing in either the
-   * metastore or on the dfs.
-   *
-   * @param dbName
-   *          name of the database, if not specified the default will be used.
-   * @param tableName
-   *          Table we want to run the check for. If null we'll check all the
-   *          tables in the database.
-   * @param partitions
-   *          List of partition name value pairs, if null or empty check all
-   *          partitions
-   * @param result
-   *          Fill this with the results of the check
-   * @throws HiveException
-   *           Failed to get required information from the metastore.
-   * @throws IOException
-   *           Most likely filesystem related
-   */
-  public void checkMetastore(String dbName, String tableName,
-      List<? extends Map<String, String>> partitions, CheckResult result)
-      throws HiveException, IOException {
-
-    if (dbName == null || "".equalsIgnoreCase(dbName)) {
-      dbName = Warehouse.DEFAULT_DATABASE_NAME;
-    }
-
-    try {
-      if (tableName == null || "".equals(tableName)) {
-        // no table specified, check all tables and all partitions.
-        List<String> tables = hive.getTablesForDb(dbName, ".*");
-        for (String currentTableName : tables) {
-          checkTable(dbName, currentTableName, null, result);
-        }
-
-        findUnknownTables(dbName, tables, result);
-      } else if (partitions == null || partitions.isEmpty()) {
-        // only one table, let's check all partitions
-        checkTable(dbName, tableName, null, result);
-      } else {
-        // check the specified partitions
-        checkTable(dbName, tableName, partitions, result);
-      }
-      LOG.info("Number of partitionsNotInMs=" + result.getPartitionsNotInMs()
-              + ", partitionsNotOnFs=" + result.getPartitionsNotOnFs()
-              + ", tablesNotInMs=" + result.getTablesNotInMs()
-              + ", tablesNotOnFs=" + result.getTablesNotOnFs());
-    } catch (MetaException e) {
-      throw new HiveException(e);
-    } catch (TException e) {
-      throw new HiveException(e);
-    }
-  }
-
-  /**
-   * Check for table directories that aren't in the metastore.
-   *
-   * @param dbName
-   *          Name of the database
-   * @param tables
-   *          List of table names
-   * @param result
-   *          Add any found tables to this
-   * @throws HiveException
-   *           Failed to get required information from the metastore.
-   * @throws IOException
-   *           Most likely filesystem related
-   * @throws MetaException
-   *           Failed to get required information from the metastore.
-   * @throws NoSuchObjectException
-   *           Failed to get required information from the metastore.
-   * @throws TException
-   *           Thrift communication error.
-   */
-  void findUnknownTables(String dbName, List<String> tables, CheckResult 
result)
-      throws IOException, MetaException, TException, HiveException {
-
-    Set<Path> dbPaths = new HashSet<Path>();
-    Set<String> tableNames = new HashSet<String>(tables);
-
-    for (String tableName : tables) {
-      Table table = hive.getTable(dbName, tableName);
-      // hack, instead figure out a way to get the db paths
-      String isExternal = table.getParameters().get("EXTERNAL");
-      if (isExternal == null || !"TRUE".equalsIgnoreCase(isExternal)) {
-        dbPaths.add(table.getPath().getParent());
-      }
-    }
-
-    for (Path dbPath : dbPaths) {
-      FileSystem fs = dbPath.getFileSystem(conf);
-      FileStatus[] statuses = fs.listStatus(dbPath, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-      for (FileStatus status : statuses) {
-
-        if (status.isDir() && 
!tableNames.contains(status.getPath().getName())) {
-
-          result.getTablesNotInMs().add(status.getPath().getName());
-        }
-      }
-    }
-  }
-
-  /**
-   * Check the metastore for inconsistencies, data missing in either the
-   * metastore or on the dfs.
-   *
-   * @param dbName
-   *          Name of the database
-   * @param tableName
-   *          Name of the table
-   * @param partitions
-   *          Partitions to check, if null or empty get all the partitions.
-   * @param result
-   *          Result object
-   * @throws HiveException
-   *           Failed to get required information from the metastore.
-   * @throws IOException
-   *           Most likely filesystem related
-   * @throws MetaException
-   *           Failed to get required information from the metastore.
-   */
-  void checkTable(String dbName, String tableName,
-      List<? extends Map<String, String>> partitions, CheckResult result)
-      throws MetaException, IOException, HiveException {
-
-    Table table = null;
-
-    try {
-      table = hive.getTable(dbName, tableName);
-    } catch (HiveException e) {
-      result.getTablesNotInMs().add(tableName);
-      return;
-    }
-
-    PartitionIterable parts;
-    boolean findUnknownPartitions = true;
-
-    if (table.isPartitioned()) {
-      if (partitions == null || partitions.isEmpty()) {
-        String mode = HiveConf.getVar(conf, ConfVars.HIVEMAPREDMODE, (String) 
null);
-        if ("strict".equalsIgnoreCase(mode)) {
-          parts = new PartitionIterable(hive, table, null, conf.getIntVar(
-              HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
-        } else {
-          List<Partition> loadedPartitions = new ArrayList<>();
-          PerfLogger perfLogger = SessionState.getPerfLogger();
-          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
-          loadedPartitions.addAll(hive.getAllPartitionsOf(table));
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
-          parts = new PartitionIterable(loadedPartitions);
-        }
-      } else {
-        // we're interested in specific partitions,
-        // don't check for any others
-        findUnknownPartitions = false;
-        List<Partition> loadedPartitions = new ArrayList<>();
-        for (Map<String, String> map : partitions) {
-          Partition part = hive.getPartition(table, map, false);
-          if (part == null) {
-            PartitionResult pr = new PartitionResult();
-            pr.setTableName(tableName);
-            pr.setPartitionName(Warehouse.makePartPath(map));
-            result.getPartitionsNotInMs().add(pr);
-          } else {
-            loadedPartitions.add(part);
-          }
-        }
-        parts = new PartitionIterable(loadedPartitions);
-      }
-    } else {
-      parts = new PartitionIterable(Collections.<Partition>emptyList());
-    }
-
-    checkTable(table, parts, findUnknownPartitions, result);
-  }
-
-  /**
-   * Check the metastore for inconsistencies, data missing in either the
-   * metastore or on the dfs.
-   *
-   * @param table
-   *          Table to check
-   * @param parts
-   *          Partitions to check
-   * @param result
-   *          Result object
-   * @param findUnknownPartitions
-   *          Should we try to find unknown partitions?
-   * @throws IOException
-   *           Could not get information from filesystem
-   * @throws HiveException
-   *           Could not create Partition object
-   */
-  void checkTable(Table table, PartitionIterable parts,
-      boolean findUnknownPartitions, CheckResult result) throws IOException,
-      HiveException {
-
-    Path tablePath = table.getPath();
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
-      result.getTablesNotOnFs().add(table.getTableName());
-      return;
-    }
-
-    Set<Path> partPaths = new HashSet<Path>();
-
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = partition.getDataLocation();
-      fs = partPath.getFileSystem(conf);
-      if (!fs.exists(partPath)) {
-        PartitionResult pr = new PartitionResult();
-        pr.setPartitionName(partition.getName());
-        pr.setTableName(partition.getTable().getTableName());
-        result.getPartitionsNotOnFs().add(pr);
-      }
-
-      for (int i = 0; i < partition.getSpec().size(); i++) {
-        Path qualifiedPath = partPath.makeQualified(fs);
-        StringInternUtils.internUriStringsInPath(qualifiedPath);
-        partPaths.add(qualifiedPath);
-        partPath = partPath.getParent();
-      }
-    }
-
-    if (findUnknownPartitions) {
-      findUnknownPartitions(table, partPaths, result);
-    }
-  }
-
-  /**
-   * Find partitions on the fs that are unknown to the metastore.
-   *
-   * @param table
-   *          Table where the partitions would be located
-   * @param partPaths
-   *          Paths of the partitions the ms knows about
-   * @param result
-   *          Result object
-   * @throws IOException
-   *           Thrown if we fail at fetching listings from the fs.
-   * @throws HiveException 
-   */
-  void findUnknownPartitions(Table table, Set<Path> partPaths,
-      CheckResult result) throws IOException, HiveException {
-
-    Path tablePath = table.getPath();
-    // now check the table folder and see if we find anything
-    // that isn't in the metastore
-    Set<Path> allPartDirs = new HashSet<Path>();
-    checkPartitionDirs(tablePath, allPartDirs, 
Collections.unmodifiableList(table.getPartColNames()));
-    // don't want the table dir
-    allPartDirs.remove(tablePath);
-
-    // remove the partition paths we know about
-    allPartDirs.removeAll(partPaths);
-
-    Set<String> partColNames = Sets.newHashSet();
-    for(FieldSchema fSchema : table.getPartCols()) {
-      partColNames.add(fSchema.getName());
-    }
-
-    // we should now only have the unexpected folders left
-    for (Path partPath : allPartDirs) {
-      FileSystem fs = partPath.getFileSystem(conf);
-      String partitionName = getPartitionName(fs.makeQualified(tablePath),
-          partPath, partColNames);
-      LOG.debug("PartitionName: " + partitionName);
-
-      if (partitionName != null) {
-        PartitionResult pr = new PartitionResult();
-        pr.setPartitionName(partitionName);
-        pr.setTableName(table.getTableName());
-
-        result.getPartitionsNotInMs().add(pr);
-      }
-    }
-    LOG.debug("Number of partitions not in metastore : " + 
result.getPartitionsNotInMs().size());
-  }
-
-  /**
-   * Get the partition name from the path.
-   *
-   * @param tablePath
-   *          Path of the table.
-   * @param partitionPath
-   *          Path of the partition.
-   * @param partCols
-   *          Set of partition columns from table definition
-   * @return Partition name, for example partitiondate=2008-01-01
-   */
-  static String getPartitionName(Path tablePath, Path partitionPath,
-      Set<String> partCols) {
-    String result = null;
-    Path currPath = partitionPath;
-    LOG.debug("tablePath:" + tablePath + ", partCols: " + partCols);
-
-    while (currPath != null && !tablePath.equals(currPath)) {
-      // format: partition=p_val
-      // Add only when table partition colName matches
-      String[] parts = currPath.getName().split("=");
-      if (parts != null && parts.length > 0) {
-        if (parts.length != 2) {
-          LOG.warn(currPath.getName() + " is not a valid partition name");
-          return result;
-        }
-
-        String partitionName = parts[0];
-        if (partCols.contains(partitionName)) {
-          if (result == null) {
-            result = currPath.getName();
-          } else {
-            result = currPath.getName() + Path.SEPARATOR + result;
-          }
-        }
-      }
-      currPath = currPath.getParent();
-      LOG.debug("currPath=" + currPath);
-    }
-    return result;
-  }
-
-  /**
-   * Assume that depth is 2, i.e., partition columns are a and b
-   * tblPath/a=1  => throw exception
-   * tblPath/a=1/file => throw exception
-   * tblPath/a=1/b=2/file => return a=1/b=2
-   * tblPath/a=1/b=2/c=3 => return a=1/b=2
-   * tblPath/a=1/b=2/c=3/file => return a=1/b=2
-   *
-   * @param basePath
-   *          Start directory
-   * @param allDirs
-   *          This set will contain the leaf paths at the end.
-   * @param list
-   *          Specify how deep the search goes.
-   * @throws IOException
-   *           Thrown if we can't get lists from the fs.
-   * @throws HiveException
-   */
-
-  private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final 
List<String> partColNames) throws IOException, HiveException {
-    // Here we just reuse the THREAD_COUNT configuration for
-    // METASTORE_FS_HANDLER_THREADS_COUNT since this results in better 
performance
-    // The number of missing partitions discovered are later added by 
metastore using a
-    // threadpool of size METASTORE_FS_HANDLER_THREADS_COUNT. If we have 
different sized
-    // pool here the smaller sized pool of the two becomes a bottleneck
-    int poolSize = 
conf.getInt(ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT.varname, 15);
-
-    ExecutorService executor;
-    if (poolSize <= 1) {
-      LOG.debug("Using single-threaded version of MSCK-GetPaths");
-      executor = MoreExecutors.newDirectExecutorService();
-    } else {
-      LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of 
threads " + poolSize);
-      ThreadFactory threadFactory =
-          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
-      executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, 
threadFactory);
-    }
-    checkPartitionDirs(executor, basePath, allDirs, 
basePath.getFileSystem(conf), partColNames);
-
-    executor.shutdown();
-  }
-
-  private final class PathDepthInfoCallable implements Callable<Path> {
-    private final List<String> partColNames;
-    private final FileSystem fs;
-    private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
-    private final boolean throwException;
-    private final PathDepthInfo pd;
-
-    private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, 
FileSystem fs,
-        ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
-      this.partColNames = partColNames;
-      this.pd = pd;
-      this.fs = fs;
-      this.pendingPaths = basePaths;
-      this.throwException = "throw"
-      .equals(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION));
-    }
-
-    @Override
-    public Path call() throws Exception {
-      return processPathDepthInfo(pd);
-    }
-
-    private Path processPathDepthInfo(final PathDepthInfo pd)
-        throws IOException, HiveException, InterruptedException {
-      final Path currentPath = pd.p;
-      final int currentDepth = pd.depth;
-      FileStatus[] fileStatuses = fs.listStatus(currentPath, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-      // found no files under a sub-directory under table base path; it is 
possible that the table
-      // is empty and hence there are no partition sub-directories created 
under base path
-      if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < 
partColNames.size()) {
-        // since maxDepth is not yet reached, we are missing partition
-        // columns in currentPath
-        logOrThrowExceptionWithMsg(
-            "MSCK is missing partition columns under " + 
currentPath.toString());
-      } else {
-        // found files under currentPath add them to the queue if it is a 
directory
-        for (FileStatus fileStatus : fileStatuses) {
-          if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) 
{
-            // found a file at depth which is less than number of partition 
keys
-            logOrThrowExceptionWithMsg(
-                "MSCK finds a file rather than a directory when it searches 
for "
-                    + fileStatus.getPath().toString());
-          } else if (fileStatus.isDirectory() && currentDepth < 
partColNames.size()) {
-            // found a sub-directory at a depth less than number of partition 
keys
-            // validate if the partition directory name matches with the 
corresponding
-            // partition colName at currentDepth
-            Path nextPath = fileStatus.getPath();
-            String[] parts = nextPath.getName().split("=");
-            if (parts.length != 2) {
-              logOrThrowExceptionWithMsg("Invalid partition name " + nextPath);
-            } else if 
(!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) {
-              logOrThrowExceptionWithMsg(
-                  "Unexpected partition key " + parts[0] + " found at " + 
nextPath);
-            } else {
-              // add sub-directory to the work queue if maxDepth is not yet 
reached
-              pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1));
-            }
-          }
-        }
-        if (currentDepth == partColNames.size()) {
-          return currentPath;
-        }
-      }
-      return null;
-    }
-
-    private void logOrThrowExceptionWithMsg(String msg) throws HiveException {
-      if(throwException) {
-        throw new HiveException(msg);
-      } else {
-        LOG.warn(msg);
-      }
-    }
-  }
-
-  private static class PathDepthInfo {
-    private final Path p;
-    private final int depth;
-    PathDepthInfo(Path p, int depth) {
-      this.p = p;
-      this.depth = depth;
-    }
-  }
-
-  private void checkPartitionDirs(final ExecutorService executor,
-      final Path basePath, final Set<Path> result,
-      final FileSystem fs, final List<String> partColNames) throws 
HiveException {
-    try {
-      Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
-      ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new 
ConcurrentLinkedQueue<>();
-      nextLevel.add(new PathDepthInfo(basePath, 0));
-      //Uses level parallel implementation of a bfs. Recursive DFS 
implementations
-      //have a issue where the number of threads can run out if the number of
-      //nested sub-directories is more than the pool size.
-      //Using a two queue implementation is simpler than one queue since then 
we will
-      //have to add the complex mechanisms to let the free worker threads know 
when new levels are
-      //discovered using notify()/wait() mechanisms which can potentially lead 
to bugs if
-      //not done right
-      while(!nextLevel.isEmpty()) {
-        ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new 
ConcurrentLinkedQueue<>();
-        //process each level in parallel
-        while(!nextLevel.isEmpty()) {
-          futures.add(
-              executor.submit(new PathDepthInfoCallable(nextLevel.poll(), 
partColNames, fs, tempQueue)));
-        }
-        while(!futures.isEmpty()) {
-          Path p = futures.poll().get();
-          if (p != null) {
-            result.add(p);
-          }
-        }
-        //update the nextlevel with newly discovered sub-directories from the 
above
-        nextLevel = tempQueue;
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      LOG.error(e.getMessage());
-      executor.shutdownNow();
-      throw new HiveException(e.getCause());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index cff32d3..2131bf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -1324,7 +1326,7 @@ public final class GenMapRedUtils {
       }
 
       // update the FileSinkOperator to include partition columns
-      usePartitionColumns(fsInputDesc.getTableInfo().getProperties(), 
dpCtx.getDPColNames());
+      usePartitionColumns(fsInputDesc.getTableInfo().getProperties(), 
fsInputDesc.getTable(), dpCtx.getDPColNames());
     } else {
       // non-partitioned table
       fsInputDesc.getTableInfo().getProperties().remove(
@@ -2090,6 +2092,23 @@ public final class GenMapRedUtils {
     }
     return null;
   }
+
+  static void usePartitionColumns(Properties properties, Table table, 
List<String> partColNames) {
+    if 
(properties.containsKey(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS))
 {
+      usePartitionColumns(properties, partColNames);
+    } else {
+      List<FieldSchema> partCols = table.getPartCols();
+      String partNames = 
partCols.stream().map(FieldSchema::getName).collect(Collectors.joining("/"));
+      String partTypes = 
partCols.stream().map(FieldSchema::getType).collect(Collectors.joining(":"));
+      properties.setProperty(
+        
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
+        partNames);
+      properties.setProperty(
+        
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES,
+        partTypes);
+    }
+  }
+
   /**
    * Uses only specified partition columns.
    * Provided properties should be pre-populated with partition column names 
and types.

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index bba7d6c..6e7c78b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -3841,7 +3841,11 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
     Table tab = getTable(tableName);
     List<Map<String, String>> specs = getPartitionSpecs(tab, ast);
-    outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED));
+    if (repair && AcidUtils.isTransactionalTable(tab)) {
+      outputs.add(new WriteEntity(tab, WriteType.DDL_EXCLUSIVE));
+    } else {
+      outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED));
+    }
     MsckDesc checkDesc = new MsckDesc(tableName, specs, ctx.getResFile(),
         repair, addPartitions, dropPartitions);
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index 27f677e..f00148b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.PartitionManagementTask;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -837,6 +838,11 @@ public class CreateTableDesc extends DDLDesc implements 
Serializable {
     if (isExternal()) {
       tbl.setProperty("EXTERNAL", "TRUE");
       tbl.setTableType(TableType.EXTERNAL_TABLE);
+      // only add if user have not explicit set it (user explicitly disabled 
for example in which case don't flip it)
+      if 
(tbl.getProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY) == 
null) {
+        // partition discovery is on by default if undefined
+        
tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, 
"true");
+      }
     }
 
     // If the sorted columns is a superset of bucketed columns, store this 
fact.

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java
index ce2b186..3e45016 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
@@ -27,17 +29,23 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.CheckResult.PartitionResult;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Msck;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.RetryUtilities;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
-import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.RetryUtilities.RetryException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,42 +56,61 @@ import org.mockito.Mockito;
 
 public class TestMsckCreatePartitionsInBatches {
   private static HiveConf hiveConf;
-  private static DDLTask ddlTask;
+  private static Msck msck;
+  private final String catName = "hive";
+  private final String dbName = "default";
   private final String tableName = "test_msck_batch";
-  private static Hive db;
+  private static IMetaStoreClient db;
   private List<String> repairOutput;
   private Table table;
 
   @BeforeClass
-  public static void setupClass() throws HiveException {
+  public static void setupClass() throws HiveException, MetaException {
     hiveConf = new HiveConf(TestMsckCreatePartitionsInBatches.class);
     hiveConf.setIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE, 5);
     hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
         
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     SessionState.start(hiveConf);
-    db = Hive.get(hiveConf);
-    ddlTask = new DDLTask();
+    try {
+      db = new HiveMetaStoreClient(hiveConf);
+    } catch (MetaException e) {
+      throw new HiveException(e);
+    }
+    msck = new Msck( false, false);
+    msck.init(hiveConf);
   }
 
   @Before
   public void before() throws Exception {
-    createPartitionedTable("default", tableName);
-    table = db.getTable(tableName);
+    createPartitionedTable(catName, dbName, tableName);
+    table = db.getTable(catName, dbName, tableName);
     repairOutput = new ArrayList<String>();
   }
 
   @After
   public void after() throws Exception {
-    cleanUpTableQuietly("default", tableName);
+    cleanUpTableQuietly(catName, dbName, tableName);
   }
 
-  private Table createPartitionedTable(String dbName, String tableName) throws 
Exception {
+  private Table createPartitionedTable(String catName, String dbName, String 
tableName) throws Exception {
     try {
-      db.dropTable(dbName, tableName);
-      db.createTable(tableName, Arrays.asList("key", "value"), // Data columns.
-          Arrays.asList("city"), // Partition columns.
-          TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class);
-      return db.getTable(dbName, tableName);
+      db.dropTable(catName, dbName, tableName);
+      Table table = new Table();
+      table.setCatName(catName);
+      table.setDbName(dbName);
+      table.setTableName(tableName);
+      FieldSchema col1 = new FieldSchema("key", "string", "");
+      FieldSchema col2 = new FieldSchema("value", "int", "");
+      FieldSchema col3 = new FieldSchema("city", "string", "");
+      StorageDescriptor sd = new StorageDescriptor();
+      sd.setSerdeInfo(new SerDeInfo());
+      sd.setInputFormat(TextInputFormat.class.getCanonicalName());
+      
sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getCanonicalName());
+      sd.setCols(Arrays.asList(col1, col2));
+      table.setPartitionKeys(Arrays.asList(col3));
+      table.setSd(sd);
+      db.createTable(table);
+      return db.getTable(catName, dbName, tableName);
     } catch (Exception exception) {
       fail("Unable to drop and create table " + 
StatsUtils.getFullyQualifiedTableName(dbName, tableName) + " because "
           + StringUtils.stringifyException(exception));
@@ -91,9 +118,9 @@ public class TestMsckCreatePartitionsInBatches {
     }
   }
 
-  private void cleanUpTableQuietly(String dbName, String tableName) {
+  private void cleanUpTableQuietly(String catName, String dbName, String 
tableName) {
     try {
-      db.dropTable(dbName, tableName, true, true, true);
+      db.dropTable(catName, dbName, tableName);
     } catch (Exception exception) {
       fail("Unexpected exception: " + 
StringUtils.stringifyException(exception));
     }
@@ -119,19 +146,23 @@ public class TestMsckCreatePartitionsInBatches {
   public void testNumberOfCreatePartitionCalls() throws Exception {
     // create 10 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(10);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // batch size of 5 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, 
table, 5, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 
5, 2, 0);
     // there should be 2 calls to create partitions with each batch size of 5
-    ArgumentCaptor<AddPartitionDesc> argument = 
ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, 
Mockito.times(2)).createPartitions(argument.capture());
+    ArgumentCaptor<Boolean> ifNotExistsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) 
List.class);
+    Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), 
ifNotExistsArg.capture(), needResultsArg.capture());
     // confirm the batch sizes were 5, 5 in the two calls to create partitions
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
     Assert.assertEquals(String.format("Unexpected batch size in retry attempt 
%d ", retryAttempt++),
-        5, apds.get(0).getPartitionCount());
+        5, apds.get(0).size());
     Assert.assertEquals(String.format("Unexpected batch size in retry attempt 
%d ", retryAttempt++),
-        5, apds.get(1).getPartitionCount());
+        5, apds.get(1).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
   }
 
   /**
@@ -144,19 +175,23 @@ public class TestMsckCreatePartitionsInBatches {
   public void testUnevenNumberOfCreatePartitionCalls() throws Exception {
     // create 9 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(9);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // batch size of 5 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, 
table, 5, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 
5, 2, 0);
     // there should be 2 calls to create partitions with batch sizes of 5, 4
-    ArgumentCaptor<AddPartitionDesc> argument = 
ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, 
Mockito.times(2)).createPartitions(argument.capture());
+    ArgumentCaptor<Boolean> ifNotExistsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) 
List.class);
+    Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), 
ifNotExistsArg.capture(), needResultsArg.capture());
     // confirm the batch sizes were 5, 4 in the two calls to create partitions
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
     Assert.assertEquals(String.format("Unexpected batch size in retry attempt 
%d ", retryAttempt++),
-        5, apds.get(0).getPartitionCount());
+        5, apds.get(0).size());
     Assert.assertEquals(String.format("Unexpected batch size in retry attempt 
%d ", retryAttempt++),
-        4, apds.get(1).getPartitionCount());
+        4, apds.get(1).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
   }
 
   /**
@@ -169,14 +204,20 @@ public class TestMsckCreatePartitionsInBatches {
   public void testEqualNumberOfPartitions() throws Exception {
     // create 13 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(13);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // batch size of 13 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, 
table, 13, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 
13, 2, 0);
+    // there should be 1 call to create partitions with batch sizes of 13
+    ArgumentCaptor<Boolean> ifNotExistsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) 
List.class);
     // there should be 1 call to create partitions with batch sizes of 13
-    ArgumentCaptor<AddPartitionDesc> argument = 
ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, 
Mockito.times(1)).createPartitions(argument.capture());
+    Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), 
ifNotExistsArg.capture(),
+      needResultsArg.capture());
     Assert.assertEquals("Unexpected number of batch size", 13,
-        argument.getValue().getPartitionCount());
+        argParts.getValue().size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
   }
 
   /**
@@ -189,15 +230,22 @@ public class TestMsckCreatePartitionsInBatches {
   public void testSmallNumberOfPartitions() throws Exception {
     // create 10 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(10);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // batch size of 20 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, 
table, 20, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 
20, 2, 0);
     // there should be 1 call to create partitions with batch sizes of 10
-    Mockito.verify(spyDb, 
Mockito.times(1)).createPartitions(Mockito.anyObject());
-    ArgumentCaptor<AddPartitionDesc> argument = 
ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb).createPartitions(argument.capture());
+    Mockito.verify(spyDb, 
Mockito.times(1)).add_partitions(Mockito.anyObject(), Mockito.anyBoolean(),
+      Mockito.anyBoolean());
+    ArgumentCaptor<Boolean> ifNotExistsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) 
List.class);
+    // there should be 1 call to create partitions with batch sizes of 10
+    Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), 
ifNotExistsArg.capture(),
+      needResultsArg.capture());
     Assert.assertEquals("Unexpected number of batch size", 10,
-        argument.getValue().getPartitionCount());
+        argParts.getValue().size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
   }
 
   /**
@@ -210,28 +258,34 @@ public class TestMsckCreatePartitionsInBatches {
   public void testBatchingWhenException() throws Exception {
     // create 13 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(23);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // first call to createPartitions should throw exception
     
Mockito.doThrow(HiveException.class).doCallRealMethod().doCallRealMethod().when(spyDb)
-        .createPartitions(Mockito.any(AddPartitionDesc.class));
+      .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(),
+        Mockito.anyBoolean());
 
     // test with a batch size of 30 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, 
table, 30, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 
30, 2, 0);
     // confirm the batch sizes were 23, 15, 8 in the three calls to create 
partitions
-    ArgumentCaptor<AddPartitionDesc> argument = 
ArgumentCaptor.forClass(AddPartitionDesc.class);
+    ArgumentCaptor<Boolean> ifNotExistsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) 
List.class);
     // there should be 3 calls to create partitions with batch sizes of 23, 
15, 8
-    Mockito.verify(spyDb, 
Mockito.times(3)).createPartitions(argument.capture());
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    Mockito.verify(spyDb, Mockito.times(3)).add_partitions(argParts.capture(), 
ifNotExistsArg.capture(),
+      needResultsArg.capture());
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 23,
-        apds.get(0).getPartitionCount());
+        apds.get(0).size());
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 15,
-        apds.get(1).getPartitionCount());
+        apds.get(1).size());
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 8,
-        apds.get(2).getPartitionCount());
+        apds.get(2).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
   }
 
   /**
@@ -244,38 +298,44 @@ public class TestMsckCreatePartitionsInBatches {
   @Test
   public void testRetriesExhaustedBatchSize() throws Exception {
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(17);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     Mockito.doThrow(HiveException.class).when(spyDb)
-        .createPartitions(Mockito.any(AddPartitionDesc.class));
+      .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), 
Mockito.anyBoolean());
     // batch size of 5 and decaying factor of 2
     Exception ex = null;
     try {
-      ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, 
table, 30, 2, 0);
+      msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 
30, 2, 0);
     } catch (Exception retryEx) {
       ex = retryEx;
     }
-    Assert.assertFalse("Exception was expected but was not thrown", ex == 
null);
-    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof 
RetryException);
+    assertFalse("Exception was expected but was not thrown", ex == null);
+    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof 
RetryUtilities.RetryException);
+    // there should be 5 calls to create partitions with batch sizes of 17, 
15, 7, 3, 1
+    ArgumentCaptor<Boolean> ifNotExistsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) 
List.class);
     // there should be 5 calls to create partitions with batch sizes of 17, 
15, 7, 3, 1
-    ArgumentCaptor<AddPartitionDesc> argument = 
ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, 
Mockito.times(5)).createPartitions(argument.capture());
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    Mockito.verify(spyDb, Mockito.times(5)).add_partitions(argParts.capture(), 
ifNotExistsArg.capture(),
+      needResultsArg.capture());
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 17,
-        apds.get(0).getPartitionCount());
+        apds.get(0).size());
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 15,
-        apds.get(1).getPartitionCount());
+        apds.get(1).size());
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 7,
-        apds.get(2).getPartitionCount());
+        apds.get(2).size());
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 3,
-        apds.get(3).getPartitionCount());
+        apds.get(3).size());
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 1,
-        apds.get(4).getPartitionCount());
+        apds.get(4).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
   }
 
   /**
@@ -285,28 +345,32 @@ public class TestMsckCreatePartitionsInBatches {
   @Test
   public void testMaxRetriesReached() throws Exception {
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(17);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     Mockito.doThrow(HiveException.class).when(spyDb)
-        .createPartitions(Mockito.any(AddPartitionDesc.class));
+      .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), 
Mockito.anyBoolean());
     // batch size of 5 and decaying factor of 2
     Exception ex = null;
     try {
-      ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, 
table, 30, 2, 2);
+      msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 
30, 2, 2);
     } catch (Exception retryEx) {
       ex = retryEx;
     }
-    Assert.assertFalse("Exception was expected but was not thrown", ex == 
null);
-    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof 
RetryException);
-    ArgumentCaptor<AddPartitionDesc> argument = 
ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, 
Mockito.times(2)).createPartitions(argument.capture());
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    assertFalse("Exception was expected but was not thrown", ex == null);
+    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof 
RetryUtilities.RetryException);
+    ArgumentCaptor<Boolean> ifNotExistsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) 
List.class);
+    Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), 
ifNotExistsArg.capture(), needResultsArg.capture());
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 17,
-        apds.get(0).getPartitionCount());
+        apds.get(0).size());
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 15,
-        apds.get(1).getPartitionCount());
+        apds.get(1).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
   }
 
   /**
@@ -317,25 +381,31 @@ public class TestMsckCreatePartitionsInBatches {
   @Test
   public void testOneMaxRetries() throws Exception {
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(17);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     Mockito.doThrow(HiveException.class).when(spyDb)
-        .createPartitions(Mockito.any(AddPartitionDesc.class));
+      .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), 
Mockito.anyBoolean());
     // batch size of 5 and decaying factor of 2
     Exception ex = null;
     try {
-      ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, 
table, 30, 2, 1);
+      msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 
30, 2, 1);
     } catch (Exception retryEx) {
       ex = retryEx;
     }
-    Assert.assertFalse("Exception was expected but was not thrown", ex == 
null);
-    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof 
RetryException);
+    assertFalse("Exception was expected but was not thrown", ex == null);
+    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof 
RetryUtilities.RetryException);
+    // there should be 5 calls to create partitions with batch sizes of 17, 
15, 7, 3, 1
+    ArgumentCaptor<Boolean> ifNotExistsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = 
ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) 
List.class);
     // there should be 5 calls to create partitions with batch sizes of 17, 
15, 7, 3, 1
-    ArgumentCaptor<AddPartitionDesc> argument = 
ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, 
Mockito.times(1)).createPartitions(argument.capture());
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), 
ifNotExistsArg.capture(),
+      needResultsArg.capture());
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
     Assert.assertEquals(
         String.format("Unexpected batch size in retry attempt %d ", 
retryAttempt++), 17,
-        apds.get(0).getPartitionCount());
+        apds.get(0).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
   }
 }

Reply via email to