HIVE-20707: Automatic partition management (Prasanth Jayachandran reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/64bea035 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/64bea035 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/64bea035 Branch: refs/heads/master Commit: 64bea0354fba2947e4bc0318728f5419e5d763b9 Parents: 54bba9c Author: Prasanth Jayachandran <[email protected]> Authored: Mon Oct 29 15:07:49 2018 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Mon Oct 29 15:07:49 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 12 + .../results/positive/external_table_ppd.q.out | 1 + .../positive/hbase_binary_storage_queries.q.out | 2 + .../src/test/results/positive/hbase_ddl.q.out | 2 + .../test/results/positive/hbase_queries.q.out | 1 + .../src/test/results/positive/hbasestats.q.out | 5 + .../hive/ql/txn/compactor/TestCompactor.java | 1 - .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 290 +-------- .../apache/hadoop/hive/ql/exec/ExplainTask.java | 40 +- .../hadoop/hive/ql/metadata/CheckResult.java | 142 ----- .../hive/ql/metadata/HiveMetaStoreChecker.java | 567 ------------------ .../hive/ql/optimizer/GenMapRedUtils.java | 21 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 6 +- .../hadoop/hive/ql/plan/CreateTableDesc.java | 6 + .../exec/TestMsckCreatePartitionsInBatches.java | 244 +++++--- .../exec/TestMsckDropPartitionsInBatches.java | 125 ++-- .../ql/metadata/TestHiveMetaStoreChecker.java | 187 +++--- .../queries/clientpositive/msck_repair_acid.q | 34 ++ .../clientpositive/partition_discovery.q | 77 +++ .../results/clientpositive/create_like.q.out | 1 + .../clientpositive/create_like_view.q.out | 1 + .../clientpositive/default_file_format.q.out | 4 + .../druid/druidkafkamini_basic.q.out | 2 + .../druid/druidmini_expressions.q.out | 2 + .../results/clientpositive/druid_topn.q.out | 1 + .../results/clientpositive/explain_locks.q.out | 1 + .../llap/external_table_purge.q.out | 4 + .../results/clientpositive/llap/mm_exim.q.out | 1 + .../llap/strict_managed_tables2.q.out | 2 + .../llap/table_nonprintable.q.out | 2 +- .../clientpositive/llap/whroot_external1.q.out | 6 + .../clientpositive/msck_repair_acid.q.out | 88 +++ .../clientpositive/msck_repair_drop.q.out | 68 +-- .../clientpositive/partition_discovery.q.out | 357 ++++++++++++ .../rename_external_partition_location.q.out | 2 + .../clientpositive/repl_2_exim_basic.q.out | 2 + .../show_create_table_alter.q.out | 5 + .../show_create_table_partitioned.q.out | 1 + .../show_create_table_serde.q.out | 1 + .../clientpositive/spark/stats_noscan_2.q.out | 2 + .../results/clientpositive/stats_noscan_2.q.out | 2 + .../temp_table_display_colstats_tbllvl.q.out | 5 + .../hadoop/hive/metastore/CheckResult.java | 153 +++++ .../apache/hadoop/hive/metastore/Warehouse.java | 2 +- .../hive/metastore/api/MetastoreException.java | 36 ++ .../hive/metastore/conf/MetastoreConf.java | 59 +- .../hive/metastore/utils/MetaStoreUtils.java | 55 +- .../hive/metastore/HiveMetaStoreChecker.java | 571 ++++++++++++++++++ .../org/apache/hadoop/hive/metastore/Msck.java | 530 +++++++++++++++++ .../apache/hadoop/hive/metastore/MsckInfo.java | 125 ++++ .../metastore/MsckPartitionExpressionProxy.java | 64 ++ .../hadoop/hive/metastore/ObjectStore.java | 6 +- .../hive/metastore/PartitionIterable.java | 163 ++++++ .../hive/metastore/PartitionManagementTask.java | 235 ++++++++ .../metastore/utils/MetaStoreServerUtils.java | 167 +++++- .../hive/metastore/utils/RetryUtilities.java | 110 ++++ .../hive/metastore/NonCatCallsWithCatalog.java | 4 +- .../hive/metastore/TestCatalogOldClient.java | 4 +- .../hive/metastore/TestPartitionManagement.java | 581 +++++++++++++++++++ .../hive/metastore/client/TestGetTableMeta.java | 11 +- 60 files changed, 3891 insertions(+), 1308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e226a1f..917aaeb 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4415,17 +4415,29 @@ public class HiveConf extends Configuration { "Merge adjacent joins into a single n-way join"), HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."), + /** + * @deprecated Use MetastoreConf.MSCK_PATH_VALIDATION + */ + @Deprecated HIVE_MSCK_PATH_VALIDATION("hive.msck.path.validation", "throw", new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS " + "directories that are partition-like but contain unsupported characters. 'throw' (an " + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + /** + * @deprecated Use MetastoreConf.MSCK_REPAIR_BATCH_SIZE + */ + @Deprecated HIVE_MSCK_REPAIR_BATCH_SIZE( "hive.msck.repair.batch.size", 3000, "Batch size for the msck repair command. If the value is greater than zero,\n " + "it will execute batch wise with the configured batch size. In case of errors while\n" + "adding unknown partitions the batch size is automatically reduced by half in the subsequent\n" + "retry attempt. The default value is 3000 which means it will execute in the batches of 3000."), + /** + * @deprecated Use MetastoreConf.MSCK_REPAIR_BATCH_MAX_RETRIES + */ + @Deprecated HIVE_MSCK_REPAIR_BATCH_MAX_RETRIES("hive.msck.repair.batch.max.retries", 4, "Maximum number of retries for the msck repair command when adding unknown partitions.\n " + "If the value is greater than zero it will retry adding unknown partitions until the maximum\n" http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/external_table_ppd.q.out ---------------------------------------------------------------------- diff --git a/hbase-handler/src/test/results/positive/external_table_ppd.q.out b/hbase-handler/src/test/results/positive/external_table_ppd.q.out index edcbe7e..22c8b70 100644 --- a/hbase-handler/src/test/results/positive/external_table_ppd.q.out +++ b/hbase-handler/src/test/results/positive/external_table_ppd.q.out @@ -60,6 +60,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}} EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true hbase.table.default.storage.type binary hbase.table.name t_hive http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out ---------------------------------------------------------------------- diff --git a/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out b/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out index 1209c88..bf1a89d 100644 --- a/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out +++ b/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out @@ -60,6 +60,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}} EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true hbase.table.default.storage.type binary hbase.table.name t_hive @@ -242,6 +243,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}} EXTERNAL TRUE bucketing_version 2 + discover.partitions true hbase.table.name t_hive numFiles 0 numRows 0 http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/hbase_ddl.q.out ---------------------------------------------------------------------- diff --git a/hbase-handler/src/test/results/positive/hbase_ddl.q.out b/hbase-handler/src/test/results/positive/hbase_ddl.q.out index ccd4148..fc40026 100644 --- a/hbase-handler/src/test/results/positive/hbase_ddl.q.out +++ b/hbase-handler/src/test/results/positive/hbase_ddl.q.out @@ -119,6 +119,7 @@ Table Type: EXTERNAL_TABLE Table Parameters: EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true hbase.mapred.output.outputtable kkk hbase.table.name hbase_table_0 @@ -168,6 +169,7 @@ Table Type: EXTERNAL_TABLE Table Parameters: EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true hbase.table.name hbase_table_0 #### A masked pattern was here #### http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/hbase_queries.q.out ---------------------------------------------------------------------- diff --git a/hbase-handler/src/test/results/positive/hbase_queries.q.out b/hbase-handler/src/test/results/positive/hbase_queries.q.out index eeb97f0..aea7e7e 100644 --- a/hbase-handler/src/test/results/positive/hbase_queries.q.out +++ b/hbase-handler/src/test/results/positive/hbase_queries.q.out @@ -986,6 +986,7 @@ WITH SERDEPROPERTIES ( 'hbase.columns.mapping'='cf:string', 'serialization.format'='1') TBLPROPERTIES ( + 'discover.partitions'='true', 'hbase.table.name'='hbase_table_0', #### A masked pattern was here #### PREHOOK: query: DROP TABLE IF EXISTS hbase_table_9 http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/hbase-handler/src/test/results/positive/hbasestats.q.out ---------------------------------------------------------------------- diff --git a/hbase-handler/src/test/results/positive/hbasestats.q.out b/hbase-handler/src/test/results/positive/hbasestats.q.out index 5a4aea9..5143522 100644 --- a/hbase-handler/src/test/results/positive/hbasestats.q.out +++ b/hbase-handler/src/test/results/positive/hbasestats.q.out @@ -42,6 +42,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"country\":\"true\",\"country_id\":\"true\",\"key\":\"true\",\"state\":\"true\"}} EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true numFiles 0 numRows 0 @@ -136,6 +137,7 @@ Table Type: EXTERNAL_TABLE Table Parameters: EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true #### A masked pattern was here #### numFiles 0 @@ -203,6 +205,7 @@ Table Type: EXTERNAL_TABLE Table Parameters: EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true #### A masked pattern was here #### numFiles 0 @@ -262,6 +265,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true #### A masked pattern was here #### numFiles 0 @@ -371,6 +375,7 @@ Table Type: EXTERNAL_TABLE Table Parameters: EXTERNAL TRUE bucketing_version 2 + discover.partitions true external.table.purge true #### A masked pattern was here #### numFiles 0 http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index a9d7468..9648645 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1679,7 +1679,6 @@ public class TestCompactor { Assert.assertNotEquals("Unexpected default compression size", 2700, OrcConf.BUFFER_SIZE.getDefaultValue()); - // Insert one more row - this should trigger hive.compactor.delta.pct.threshold to be reached for ttp2 executeStatementOnDriver("insert into " + tblName1 + " values (6, 'f')", driver); executeStatementOnDriver("insert into " + tblName2 + " values (6, 'f')", driver); http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 807f159..6790a06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -74,7 +74,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.Msck; +import org.apache.hadoop.hive.metastore.MsckInfo; import org.apache.hadoop.hive.metastore.PartitionDropOptions; +import org.apache.hadoop.hive.metastore.PartitionManagementTask; import org.apache.hadoop.hive.metastore.StatObjectConverter; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; @@ -149,13 +152,13 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.CheckConstraint; -import org.apache.hadoop.hive.ql.metadata.CheckResult; +import org.apache.hadoop.hive.metastore.CheckResult; import org.apache.hadoop.hive.ql.metadata.DefaultConstraint; import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; -import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreChecker; +import org.apache.hadoop.hive.metastore.HiveMetaStoreChecker; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.NotNullConstraint; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -2125,279 +2128,22 @@ public class DDLTask extends Task<DDLWork> implements Serializable { * @return Returns 0 when execution succeeds and above 0 if it fails. */ private int msck(Hive db, MsckDesc msckDesc) { - CheckResult result = new CheckResult(); - List<String> repairOutput = new ArrayList<String>(); + Msck msck; try { - HiveMetaStoreChecker checker = new HiveMetaStoreChecker(db); + msck = new Msck( false, false); + msck.init(db.getConf()); String[] names = Utilities.getDbTableName(msckDesc.getTableName()); - - // checkMetastore call will fill in result with partitions that are present in filesystem - // and missing in metastore - accessed through getPartitionsNotInMs - // And partitions that are not present in filesystem and metadata exists in metastore - - // accessed through getPartitionNotOnFS - checker.checkMetastore(names[0], names[1], msckDesc.getPartSpecs(), result); - Set<CheckResult.PartitionResult> partsNotInMs = result.getPartitionsNotInMs(); - Set<CheckResult.PartitionResult> partsNotInFs = result.getPartitionsNotOnFs(); - - if (msckDesc.isRepairPartitions()) { - // Repair metadata in HMS - - Table table = db.getTable(msckDesc.getTableName()); - int maxRetries = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_MAX_RETRIES); - int decayingFactor = 2; - - if (msckDesc.isAddPartitions() && !partsNotInMs.isEmpty()) { - // MSCK called to add missing paritions into metastore and there are - // missing partitions. - - int batchSize = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE); - if (batchSize == 0) { - //batching is not enabled. Try to add all the partitions in one call - batchSize = partsNotInMs.size(); - } - - AbstractList<String> vals = null; - String settingStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION); - boolean doValidate = !("ignore".equals(settingStr)); - boolean doSkip = doValidate && "skip".equals(settingStr); - // The default setting is "throw"; assume doValidate && !doSkip means throw. - if (doValidate) { - // Validate that we can add partition without escaping. Escaping was originally intended - // to avoid creating invalid HDFS paths; however, if we escape the HDFS path (that we - // deem invalid but HDFS actually supports - it is possible to create HDFS paths with - // unprintable characters like ASCII 7), metastore will create another directory instead - // of the one we are trying to "repair" here. - Iterator<CheckResult.PartitionResult> iter = partsNotInMs.iterator(); - while (iter.hasNext()) { - CheckResult.PartitionResult part = iter.next(); - try { - vals = Warehouse.makeValsFromName(part.getPartitionName(), vals); - } catch (MetaException ex) { - throw new HiveException(ex); - } - for (String val : vals) { - String escapedPath = FileUtils.escapePathName(val); - assert escapedPath != null; - if (escapedPath.equals(val)) { - continue; - } - String errorMsg = "Repair: Cannot add partition " + msckDesc.getTableName() + ':' + - part.getPartitionName() + " due to invalid characters in the name"; - if (doSkip) { - repairOutput.add(errorMsg); - iter.remove(); - } else { - throw new HiveException(errorMsg); - } - } - } - } - try { - createPartitionsInBatches(db, repairOutput, partsNotInMs, table, batchSize, - decayingFactor, maxRetries); - } catch (Exception e) { - throw new HiveException(e); - } - } - - if (msckDesc.isDropPartitions() && !partsNotInFs.isEmpty()) { - // MSCK called to drop stale paritions from metastore and there are - // stale partitions. - - int batchSize = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE); - if (batchSize == 0) { - //batching is not enabled. Try to drop all the partitions in one call - batchSize = partsNotInFs.size(); - } - - try { - dropPartitionsInBatches(db, repairOutput, partsNotInFs, table, batchSize, - decayingFactor, maxRetries); - } catch (Exception e) { - throw new HiveException(e); - } - } - } - } catch (HiveException e) { - LOG.warn("Failed to run metacheck: ", e); + MsckInfo msckInfo = new MsckInfo(SessionState.get().getCurrentCatalog(), names[0], + names[1], msckDesc.getPartSpecs(), msckDesc.getResFile(), + msckDesc.isRepairPartitions(), msckDesc.isAddPartitions(), msckDesc.isDropPartitions(), -1); + return msck.repair(msckInfo); + } catch (MetaException e) { + LOG.error("Unable to create msck instance.", e); return 1; - } catch (IOException e) { - LOG.warn("Failed to run metacheck: ", e); + } catch (SemanticException e) { + LOG.error("Msck failed.", e); return 1; - } finally { - BufferedWriter resultOut = null; - try { - Path resFile = new Path(msckDesc.getResFile()); - FileSystem fs = resFile.getFileSystem(conf); - resultOut = new BufferedWriter(new OutputStreamWriter(fs - .create(resFile))); - - boolean firstWritten = false; - firstWritten |= writeMsckResult(result.getTablesNotInMs(), - "Tables not in metastore:", resultOut, firstWritten); - firstWritten |= writeMsckResult(result.getTablesNotOnFs(), - "Tables missing on filesystem:", resultOut, firstWritten); - firstWritten |= writeMsckResult(result.getPartitionsNotInMs(), - "Partitions not in metastore:", resultOut, firstWritten); - firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(), - "Partitions missing from filesystem:", resultOut, firstWritten); - for (String rout : repairOutput) { - if (firstWritten) { - resultOut.write(terminator); - } else { - firstWritten = true; - } - resultOut.write(rout); - } - } catch (IOException e) { - LOG.warn("Failed to save metacheck output: ", e); - return 1; - } finally { - if (resultOut != null) { - try { - resultOut.close(); - } catch (IOException e) { - LOG.warn("Failed to close output file: ", e); - return 1; - } - } - } - } - - return 0; - } - - @VisibleForTesting - void createPartitionsInBatches(Hive db, List<String> repairOutput, - Set<CheckResult.PartitionResult> partsNotInMs, Table table, int batchSize, int decayingFactor, int maxRetries) - throws Exception { - String addMsgFormat = "Repair: Added partition to metastore " - + table.getTableName() + ":%s"; - Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInMs); - new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) { - @Override - public Void execute(int size) throws Exception { - while (!batchWork.isEmpty()) { - //get the current batch size - int currentBatchSize = size; - AddPartitionDesc apd = - new AddPartitionDesc(table.getDbName(), table.getTableName(), true); - //store the partitions temporarily until processed - List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize); - List<String> addMsgs = new ArrayList<>(currentBatchSize); - //add the number of partitions given by the current batchsize - for (CheckResult.PartitionResult part : batchWork) { - if (currentBatchSize == 0) { - break; - } - apd.addPartition(Warehouse.makeSpecFromName(part.getPartitionName()), null); - lastBatch.add(part); - addMsgs.add(String.format(addMsgFormat, part.getPartitionName())); - currentBatchSize--; - } - db.createPartitions(apd); - // if last batch is successful remove it from partsNotInMs - batchWork.removeAll(lastBatch); - repairOutput.addAll(addMsgs); - } - return null; - } - }.run(); - } - - // Drops partitions in batches. partNotInFs is split into batches based on batchSize - // and dropped. The dropping will be through RetryUtilities which will retry when there is a - // failure after reducing the batchSize by decayingFactor. Retrying will cease when maxRetries - // limit is reached or batchSize reduces to 0, whichever comes earlier. - @VisibleForTesting - void dropPartitionsInBatches(Hive db, List<String> repairOutput, - Set<CheckResult.PartitionResult> partsNotInFs, Table table, int batchSize, int decayingFactor, - int maxRetries) throws Exception { - String dropMsgFormat = - "Repair: Dropped partition from metastore " + table.getFullyQualifiedName() + ":%s"; - // Copy of partitions that will be split into batches - Set<CheckResult.PartitionResult> batchWork = new TreeSet<>(partsNotInFs); - - new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) { - @Override - public Void execute(int size) throws Exception { - while (!batchWork.isEmpty()) { - int currentBatchSize = size; - - // to store the partitions that are currently being processed - List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize); - - // drop messages for the dropped partitions - List<String> dropMsgs = new ArrayList<>(currentBatchSize); - - // Partitions to be dropped - List<String> dropParts = new ArrayList<>(currentBatchSize); - - for (CheckResult.PartitionResult part : batchWork) { - // This batch is full: break out of for loop to execute - if (currentBatchSize == 0) { - break; - } - - dropParts.add(part.getPartitionName()); - - // Add the part to lastBatch to track the parition being dropped - lastBatch.add(part); - - // Update messages - dropMsgs.add(String.format(dropMsgFormat, part.getPartitionName())); - - // Decrement batch size. When this gets to 0, the batch will be executed - currentBatchSize--; - } - - // this call is deleting partitions that are already missing from filesystem - // so 3rd parameter (deleteData) is set to false - // msck is doing a clean up of hms. if for some reason the partition is already - // deleted, then it is good. So, the last parameter ifexists is set to true - db.dropPartitions(table, dropParts, false, true); - - // if last batch is successful remove it from partsNotInFs - batchWork.removeAll(lastBatch); - repairOutput.addAll(dropMsgs); - } - return null; - } - }.run(); - } - - /** - * Write the result of msck to a writer. - * - * @param result - * The result we're going to write - * @param msg - * Message to write. - * @param out - * Writer to write to - * @param wrote - * if any previous call wrote data - * @return true if something was written - * @throws IOException - * In case the writing fails - */ - private boolean writeMsckResult(Set<? extends Object> result, String msg, - Writer out, boolean wrote) throws IOException { - - if (!result.isEmpty()) { - if (wrote) { - out.write(terminator); - } - - out.write(msg); - for (Object entry : result) { - out.write(separator); - out.write(entry.toString()); - } - return true; } - - return false; } /** @@ -5011,6 +4757,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { if (crtTbl.isExternal()) { tbl.setProperty("EXTERNAL", "TRUE"); tbl.setTableType(TableType.EXTERNAL_TABLE); + // partition discovery is on by default + tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); } tbl.setFields(oldtbl.getCols()); @@ -5109,6 +4857,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { if (crtTbl.isExternal()) { tbl.setProperty("EXTERNAL", "TRUE"); tbl.setTableType(TableType.EXTERNAL_TABLE); + // partition discovery is on by default + tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); } else { tbl.getParameters().remove("EXTERNAL"); } http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index 4cc5fa8..7c4efab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -356,32 +356,28 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { if (jsonOutput) { out = null; } - if (work.getParseContext() != null) { - List<LockComponent> lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf); - if (null != out) { - out.print("LOCK INFORMATION:\n"); - } - List<ExplainLockDesc> locks = new ArrayList<>(lockComponents.size()); - - for (LockComponent component : lockComponents) { - ExplainLockDesc lockDesc = new ExplainLockDesc(component); + List<LockComponent> lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf); + if (null != out) { + out.print("LOCK INFORMATION:\n"); + } + List<ExplainLockDesc> locks = new ArrayList<>(lockComponents.size()); - if (null != out) { - out.print(lockDesc.getFullName()); - out.print(" -> "); - out.print(lockDesc.getLockType()); - out.print('\n'); - } else { - locks.add(lockDesc); - } + for (LockComponent component : lockComponents) { + ExplainLockDesc lockDesc = new ExplainLockDesc(component); + if (null != out) { + out.print(lockDesc.getFullName()); + out.print(" -> "); + out.print(lockDesc.getLockType()); + out.print('\n'); + } else { + locks.add(lockDesc); } - if (jsonOutput) { - jsonObject.put("LOCK INFORMATION:", locks); - } - } else { - System.err.println("No parse context!"); + } + + if (jsonOutput) { + jsonObject.put("LOCK INFORMATION:", locks); } return jsonObject; } http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/metadata/CheckResult.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/CheckResult.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/CheckResult.java deleted file mode 100644 index 0b4240f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/CheckResult.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.metadata; - -import java.util.Set; -import java.util.TreeSet; - -/** - * Result class used by the HiveMetaStoreChecker. - */ -public class CheckResult { - - private Set<String> tablesNotOnFs = new TreeSet<String>(); - private Set<String> tablesNotInMs = new TreeSet<String>(); - private Set<PartitionResult> partitionsNotOnFs = new TreeSet<PartitionResult>(); - private Set<PartitionResult> partitionsNotInMs = new TreeSet<PartitionResult>(); - - /** - * @return a list of tables not found on the filesystem. - */ - public Set<String> getTablesNotOnFs() { - return tablesNotOnFs; - } - - /** - * @param tablesNotOnFs - * a list of tables not found on the filesystem. - */ - public void setTablesNotOnFs(Set<String> tablesNotOnFs) { - this.tablesNotOnFs = tablesNotOnFs; - } - - /** - * @return a list of tables not found in the metastore. - */ - public Set<String> getTablesNotInMs() { - return tablesNotInMs; - } - - /** - * @param tablesNotInMs - * a list of tables not found in the metastore. - */ - public void setTablesNotInMs(Set<String> tablesNotInMs) { - this.tablesNotInMs = tablesNotInMs; - } - - /** - * @return a list of partitions not found on the fs - */ - public Set<PartitionResult> getPartitionsNotOnFs() { - return partitionsNotOnFs; - } - - /** - * @param partitionsNotOnFs - * a list of partitions not found on the fs - */ - public void setPartitionsNotOnFs(Set<PartitionResult> partitionsNotOnFs) { - this.partitionsNotOnFs = partitionsNotOnFs; - } - - /** - * @return a list of partitions not found in the metastore - */ - public Set<PartitionResult> getPartitionsNotInMs() { - return partitionsNotInMs; - } - - /** - * @param partitionsNotInMs - * a list of partitions not found in the metastore - */ - public void setPartitionsNotInMs(Set<PartitionResult> partitionsNotInMs) { - this.partitionsNotInMs = partitionsNotInMs; - } - - /** - * A basic description of a partition that is missing from either the fs or - * the ms. - */ - public static class PartitionResult implements Comparable<PartitionResult> { - private String partitionName; - private String tableName; - - /** - * @return name of partition - */ - public String getPartitionName() { - return partitionName; - } - - /** - * @param partitionName - * name of partition - */ - public void setPartitionName(String partitionName) { - this.partitionName = partitionName; - } - - /** - * @return table name - */ - public String getTableName() { - return tableName; - } - - /** - * @param tableName - * table name - */ - public void setTableName(String tableName) { - this.tableName = tableName; - } - - @Override - public String toString() { - return tableName + ":" + partitionName; - } - - public int compareTo(PartitionResult o) { - int ret = tableName.compareTo(o.tableName); - return ret != 0 ? ret : partitionName.compareTo(o.partitionName); - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java deleted file mode 100644 index 598bb2e..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ /dev/null @@ -1,567 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.metadata; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; - -import com.google.common.collect.Sets; -import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult; -import org.apache.thrift.TException; - -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * Verify that the information in the metastore matches what is on the - * filesystem. Return a CheckResult object containing lists of missing and any - * unexpected tables and partitions. - */ -public class HiveMetaStoreChecker { - - public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class); - public static final String CLASS_NAME = HiveMetaStoreChecker.class.getName(); - - private final Hive hive; - private final HiveConf conf; - - public HiveMetaStoreChecker(Hive hive) { - super(); - this.hive = hive; - conf = hive.getConf(); - } - - /** - * Check the metastore for inconsistencies, data missing in either the - * metastore or on the dfs. - * - * @param dbName - * name of the database, if not specified the default will be used. - * @param tableName - * Table we want to run the check for. If null we'll check all the - * tables in the database. - * @param partitions - * List of partition name value pairs, if null or empty check all - * partitions - * @param result - * Fill this with the results of the check - * @throws HiveException - * Failed to get required information from the metastore. - * @throws IOException - * Most likely filesystem related - */ - public void checkMetastore(String dbName, String tableName, - List<? extends Map<String, String>> partitions, CheckResult result) - throws HiveException, IOException { - - if (dbName == null || "".equalsIgnoreCase(dbName)) { - dbName = Warehouse.DEFAULT_DATABASE_NAME; - } - - try { - if (tableName == null || "".equals(tableName)) { - // no table specified, check all tables and all partitions. - List<String> tables = hive.getTablesForDb(dbName, ".*"); - for (String currentTableName : tables) { - checkTable(dbName, currentTableName, null, result); - } - - findUnknownTables(dbName, tables, result); - } else if (partitions == null || partitions.isEmpty()) { - // only one table, let's check all partitions - checkTable(dbName, tableName, null, result); - } else { - // check the specified partitions - checkTable(dbName, tableName, partitions, result); - } - LOG.info("Number of partitionsNotInMs=" + result.getPartitionsNotInMs() - + ", partitionsNotOnFs=" + result.getPartitionsNotOnFs() - + ", tablesNotInMs=" + result.getTablesNotInMs() - + ", tablesNotOnFs=" + result.getTablesNotOnFs()); - } catch (MetaException e) { - throw new HiveException(e); - } catch (TException e) { - throw new HiveException(e); - } - } - - /** - * Check for table directories that aren't in the metastore. - * - * @param dbName - * Name of the database - * @param tables - * List of table names - * @param result - * Add any found tables to this - * @throws HiveException - * Failed to get required information from the metastore. - * @throws IOException - * Most likely filesystem related - * @throws MetaException - * Failed to get required information from the metastore. - * @throws NoSuchObjectException - * Failed to get required information from the metastore. - * @throws TException - * Thrift communication error. - */ - void findUnknownTables(String dbName, List<String> tables, CheckResult result) - throws IOException, MetaException, TException, HiveException { - - Set<Path> dbPaths = new HashSet<Path>(); - Set<String> tableNames = new HashSet<String>(tables); - - for (String tableName : tables) { - Table table = hive.getTable(dbName, tableName); - // hack, instead figure out a way to get the db paths - String isExternal = table.getParameters().get("EXTERNAL"); - if (isExternal == null || !"TRUE".equalsIgnoreCase(isExternal)) { - dbPaths.add(table.getPath().getParent()); - } - } - - for (Path dbPath : dbPaths) { - FileSystem fs = dbPath.getFileSystem(conf); - FileStatus[] statuses = fs.listStatus(dbPath, FileUtils.HIDDEN_FILES_PATH_FILTER); - for (FileStatus status : statuses) { - - if (status.isDir() && !tableNames.contains(status.getPath().getName())) { - - result.getTablesNotInMs().add(status.getPath().getName()); - } - } - } - } - - /** - * Check the metastore for inconsistencies, data missing in either the - * metastore or on the dfs. - * - * @param dbName - * Name of the database - * @param tableName - * Name of the table - * @param partitions - * Partitions to check, if null or empty get all the partitions. - * @param result - * Result object - * @throws HiveException - * Failed to get required information from the metastore. - * @throws IOException - * Most likely filesystem related - * @throws MetaException - * Failed to get required information from the metastore. - */ - void checkTable(String dbName, String tableName, - List<? extends Map<String, String>> partitions, CheckResult result) - throws MetaException, IOException, HiveException { - - Table table = null; - - try { - table = hive.getTable(dbName, tableName); - } catch (HiveException e) { - result.getTablesNotInMs().add(tableName); - return; - } - - PartitionIterable parts; - boolean findUnknownPartitions = true; - - if (table.isPartitioned()) { - if (partitions == null || partitions.isEmpty()) { - String mode = HiveConf.getVar(conf, ConfVars.HIVEMAPREDMODE, (String) null); - if ("strict".equalsIgnoreCase(mode)) { - parts = new PartitionIterable(hive, table, null, conf.getIntVar( - HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); - } else { - List<Partition> loadedPartitions = new ArrayList<>(); - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING); - loadedPartitions.addAll(hive.getAllPartitionsOf(table)); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING); - parts = new PartitionIterable(loadedPartitions); - } - } else { - // we're interested in specific partitions, - // don't check for any others - findUnknownPartitions = false; - List<Partition> loadedPartitions = new ArrayList<>(); - for (Map<String, String> map : partitions) { - Partition part = hive.getPartition(table, map, false); - if (part == null) { - PartitionResult pr = new PartitionResult(); - pr.setTableName(tableName); - pr.setPartitionName(Warehouse.makePartPath(map)); - result.getPartitionsNotInMs().add(pr); - } else { - loadedPartitions.add(part); - } - } - parts = new PartitionIterable(loadedPartitions); - } - } else { - parts = new PartitionIterable(Collections.<Partition>emptyList()); - } - - checkTable(table, parts, findUnknownPartitions, result); - } - - /** - * Check the metastore for inconsistencies, data missing in either the - * metastore or on the dfs. - * - * @param table - * Table to check - * @param parts - * Partitions to check - * @param result - * Result object - * @param findUnknownPartitions - * Should we try to find unknown partitions? - * @throws IOException - * Could not get information from filesystem - * @throws HiveException - * Could not create Partition object - */ - void checkTable(Table table, PartitionIterable parts, - boolean findUnknownPartitions, CheckResult result) throws IOException, - HiveException { - - Path tablePath = table.getPath(); - FileSystem fs = tablePath.getFileSystem(conf); - if (!fs.exists(tablePath)) { - result.getTablesNotOnFs().add(table.getTableName()); - return; - } - - Set<Path> partPaths = new HashSet<Path>(); - - // check that the partition folders exist on disk - for (Partition partition : parts) { - if (partition == null) { - // most likely the user specified an invalid partition - continue; - } - Path partPath = partition.getDataLocation(); - fs = partPath.getFileSystem(conf); - if (!fs.exists(partPath)) { - PartitionResult pr = new PartitionResult(); - pr.setPartitionName(partition.getName()); - pr.setTableName(partition.getTable().getTableName()); - result.getPartitionsNotOnFs().add(pr); - } - - for (int i = 0; i < partition.getSpec().size(); i++) { - Path qualifiedPath = partPath.makeQualified(fs); - StringInternUtils.internUriStringsInPath(qualifiedPath); - partPaths.add(qualifiedPath); - partPath = partPath.getParent(); - } - } - - if (findUnknownPartitions) { - findUnknownPartitions(table, partPaths, result); - } - } - - /** - * Find partitions on the fs that are unknown to the metastore. - * - * @param table - * Table where the partitions would be located - * @param partPaths - * Paths of the partitions the ms knows about - * @param result - * Result object - * @throws IOException - * Thrown if we fail at fetching listings from the fs. - * @throws HiveException - */ - void findUnknownPartitions(Table table, Set<Path> partPaths, - CheckResult result) throws IOException, HiveException { - - Path tablePath = table.getPath(); - // now check the table folder and see if we find anything - // that isn't in the metastore - Set<Path> allPartDirs = new HashSet<Path>(); - checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(table.getPartColNames())); - // don't want the table dir - allPartDirs.remove(tablePath); - - // remove the partition paths we know about - allPartDirs.removeAll(partPaths); - - Set<String> partColNames = Sets.newHashSet(); - for(FieldSchema fSchema : table.getPartCols()) { - partColNames.add(fSchema.getName()); - } - - // we should now only have the unexpected folders left - for (Path partPath : allPartDirs) { - FileSystem fs = partPath.getFileSystem(conf); - String partitionName = getPartitionName(fs.makeQualified(tablePath), - partPath, partColNames); - LOG.debug("PartitionName: " + partitionName); - - if (partitionName != null) { - PartitionResult pr = new PartitionResult(); - pr.setPartitionName(partitionName); - pr.setTableName(table.getTableName()); - - result.getPartitionsNotInMs().add(pr); - } - } - LOG.debug("Number of partitions not in metastore : " + result.getPartitionsNotInMs().size()); - } - - /** - * Get the partition name from the path. - * - * @param tablePath - * Path of the table. - * @param partitionPath - * Path of the partition. - * @param partCols - * Set of partition columns from table definition - * @return Partition name, for example partitiondate=2008-01-01 - */ - static String getPartitionName(Path tablePath, Path partitionPath, - Set<String> partCols) { - String result = null; - Path currPath = partitionPath; - LOG.debug("tablePath:" + tablePath + ", partCols: " + partCols); - - while (currPath != null && !tablePath.equals(currPath)) { - // format: partition=p_val - // Add only when table partition colName matches - String[] parts = currPath.getName().split("="); - if (parts != null && parts.length > 0) { - if (parts.length != 2) { - LOG.warn(currPath.getName() + " is not a valid partition name"); - return result; - } - - String partitionName = parts[0]; - if (partCols.contains(partitionName)) { - if (result == null) { - result = currPath.getName(); - } else { - result = currPath.getName() + Path.SEPARATOR + result; - } - } - } - currPath = currPath.getParent(); - LOG.debug("currPath=" + currPath); - } - return result; - } - - /** - * Assume that depth is 2, i.e., partition columns are a and b - * tblPath/a=1 => throw exception - * tblPath/a=1/file => throw exception - * tblPath/a=1/b=2/file => return a=1/b=2 - * tblPath/a=1/b=2/c=3 => return a=1/b=2 - * tblPath/a=1/b=2/c=3/file => return a=1/b=2 - * - * @param basePath - * Start directory - * @param allDirs - * This set will contain the leaf paths at the end. - * @param list - * Specify how deep the search goes. - * @throws IOException - * Thrown if we can't get lists from the fs. - * @throws HiveException - */ - - private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames) throws IOException, HiveException { - // Here we just reuse the THREAD_COUNT configuration for - // METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance - // The number of missing partitions discovered are later added by metastore using a - // threadpool of size METASTORE_FS_HANDLER_THREADS_COUNT. If we have different sized - // pool here the smaller sized pool of the two becomes a bottleneck - int poolSize = conf.getInt(ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT.varname, 15); - - ExecutorService executor; - if (poolSize <= 1) { - LOG.debug("Using single-threaded version of MSCK-GetPaths"); - executor = MoreExecutors.newDirectExecutorService(); - } else { - LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + poolSize); - ThreadFactory threadFactory = - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build(); - executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory); - } - checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames); - - executor.shutdown(); - } - - private final class PathDepthInfoCallable implements Callable<Path> { - private final List<String> partColNames; - private final FileSystem fs; - private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths; - private final boolean throwException; - private final PathDepthInfo pd; - - private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs, - ConcurrentLinkedQueue<PathDepthInfo> basePaths) { - this.partColNames = partColNames; - this.pd = pd; - this.fs = fs; - this.pendingPaths = basePaths; - this.throwException = "throw" - .equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION)); - } - - @Override - public Path call() throws Exception { - return processPathDepthInfo(pd); - } - - private Path processPathDepthInfo(final PathDepthInfo pd) - throws IOException, HiveException, InterruptedException { - final Path currentPath = pd.p; - final int currentDepth = pd.depth; - FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER); - // found no files under a sub-directory under table base path; it is possible that the table - // is empty and hence there are no partition sub-directories created under base path - if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) { - // since maxDepth is not yet reached, we are missing partition - // columns in currentPath - logOrThrowExceptionWithMsg( - "MSCK is missing partition columns under " + currentPath.toString()); - } else { - // found files under currentPath add them to the queue if it is a directory - for (FileStatus fileStatus : fileStatuses) { - if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) { - // found a file at depth which is less than number of partition keys - logOrThrowExceptionWithMsg( - "MSCK finds a file rather than a directory when it searches for " - + fileStatus.getPath().toString()); - } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) { - // found a sub-directory at a depth less than number of partition keys - // validate if the partition directory name matches with the corresponding - // partition colName at currentDepth - Path nextPath = fileStatus.getPath(); - String[] parts = nextPath.getName().split("="); - if (parts.length != 2) { - logOrThrowExceptionWithMsg("Invalid partition name " + nextPath); - } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) { - logOrThrowExceptionWithMsg( - "Unexpected partition key " + parts[0] + " found at " + nextPath); - } else { - // add sub-directory to the work queue if maxDepth is not yet reached - pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1)); - } - } - } - if (currentDepth == partColNames.size()) { - return currentPath; - } - } - return null; - } - - private void logOrThrowExceptionWithMsg(String msg) throws HiveException { - if(throwException) { - throw new HiveException(msg); - } else { - LOG.warn(msg); - } - } - } - - private static class PathDepthInfo { - private final Path p; - private final int depth; - PathDepthInfo(Path p, int depth) { - this.p = p; - this.depth = depth; - } - } - - private void checkPartitionDirs(final ExecutorService executor, - final Path basePath, final Set<Path> result, - final FileSystem fs, final List<String> partColNames) throws HiveException { - try { - Queue<Future<Path>> futures = new LinkedList<Future<Path>>(); - ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>(); - nextLevel.add(new PathDepthInfo(basePath, 0)); - //Uses level parallel implementation of a bfs. Recursive DFS implementations - //have a issue where the number of threads can run out if the number of - //nested sub-directories is more than the pool size. - //Using a two queue implementation is simpler than one queue since then we will - //have to add the complex mechanisms to let the free worker threads know when new levels are - //discovered using notify()/wait() mechanisms which can potentially lead to bugs if - //not done right - while(!nextLevel.isEmpty()) { - ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new ConcurrentLinkedQueue<>(); - //process each level in parallel - while(!nextLevel.isEmpty()) { - futures.add( - executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue))); - } - while(!futures.isEmpty()) { - Path p = futures.poll().get(); - if (p != null) { - result.add(p); - } - } - //update the nextlevel with newly discovered sub-directories from the above - nextLevel = tempQueue; - } - } catch (InterruptedException | ExecutionException e) { - LOG.error(e.getMessage()); - executor.shutdownNow(); - throw new HiveException(e.getCause()); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cff32d3..2131bf1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -1324,7 +1326,7 @@ public final class GenMapRedUtils { } // update the FileSinkOperator to include partition columns - usePartitionColumns(fsInputDesc.getTableInfo().getProperties(), dpCtx.getDPColNames()); + usePartitionColumns(fsInputDesc.getTableInfo().getProperties(), fsInputDesc.getTable(), dpCtx.getDPColNames()); } else { // non-partitioned table fsInputDesc.getTableInfo().getProperties().remove( @@ -2090,6 +2092,23 @@ public final class GenMapRedUtils { } return null; } + + static void usePartitionColumns(Properties properties, Table table, List<String> partColNames) { + if (properties.containsKey(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) { + usePartitionColumns(properties, partColNames); + } else { + List<FieldSchema> partCols = table.getPartCols(); + String partNames = partCols.stream().map(FieldSchema::getName).collect(Collectors.joining("/")); + String partTypes = partCols.stream().map(FieldSchema::getType).collect(Collectors.joining(":")); + properties.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, + partNames); + properties.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES, + partTypes); + } + } + /** * Uses only specified partition columns. * Provided properties should be pre-populated with partition column names and types. http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index bba7d6c..6e7c78b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -3841,7 +3841,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } Table tab = getTable(tableName); List<Map<String, String>> specs = getPartitionSpecs(tab, ast); - outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED)); + if (repair && AcidUtils.isTransactionalTable(tab)) { + outputs.add(new WriteEntity(tab, WriteType.DDL_EXCLUSIVE)); + } else { + outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED)); + } MsckDesc checkDesc = new MsckDesc(tableName, specs, ctx.getResFile(), repair, addPartitions, dropPartitions); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java index 27f677e..f00148b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.PartitionManagementTask; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; @@ -837,6 +838,11 @@ public class CreateTableDesc extends DDLDesc implements Serializable { if (isExternal()) { tbl.setProperty("EXTERNAL", "TRUE"); tbl.setTableType(TableType.EXTERNAL_TABLE); + // only add if user have not explicit set it (user explicitly disabled for example in which case don't flip it) + if (tbl.getProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY) == null) { + // partition discovery is on by default if undefined + tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + } } // If the sorted columns is a superset of bucketed columns, store this fact. http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java index ce2b186..3e45016 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.ArrayList; @@ -27,17 +29,23 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.CheckResult.PartitionResult; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.Msck; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.RetryUtilities; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; -import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.common.util.RetryUtilities.RetryException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -48,42 +56,61 @@ import org.mockito.Mockito; public class TestMsckCreatePartitionsInBatches { private static HiveConf hiveConf; - private static DDLTask ddlTask; + private static Msck msck; + private final String catName = "hive"; + private final String dbName = "default"; private final String tableName = "test_msck_batch"; - private static Hive db; + private static IMetaStoreClient db; private List<String> repairOutput; private Table table; @BeforeClass - public static void setupClass() throws HiveException { + public static void setupClass() throws HiveException, MetaException { hiveConf = new HiveConf(TestMsckCreatePartitionsInBatches.class); hiveConf.setIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE, 5); hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); SessionState.start(hiveConf); - db = Hive.get(hiveConf); - ddlTask = new DDLTask(); + try { + db = new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + throw new HiveException(e); + } + msck = new Msck( false, false); + msck.init(hiveConf); } @Before public void before() throws Exception { - createPartitionedTable("default", tableName); - table = db.getTable(tableName); + createPartitionedTable(catName, dbName, tableName); + table = db.getTable(catName, dbName, tableName); repairOutput = new ArrayList<String>(); } @After public void after() throws Exception { - cleanUpTableQuietly("default", tableName); + cleanUpTableQuietly(catName, dbName, tableName); } - private Table createPartitionedTable(String dbName, String tableName) throws Exception { + private Table createPartitionedTable(String catName, String dbName, String tableName) throws Exception { try { - db.dropTable(dbName, tableName); - db.createTable(tableName, Arrays.asList("key", "value"), // Data columns. - Arrays.asList("city"), // Partition columns. - TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); - return db.getTable(dbName, tableName); + db.dropTable(catName, dbName, tableName); + Table table = new Table(); + table.setCatName(catName); + table.setDbName(dbName); + table.setTableName(tableName); + FieldSchema col1 = new FieldSchema("key", "string", ""); + FieldSchema col2 = new FieldSchema("value", "int", ""); + FieldSchema col3 = new FieldSchema("city", "string", ""); + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo()); + sd.setInputFormat(TextInputFormat.class.getCanonicalName()); + sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getCanonicalName()); + sd.setCols(Arrays.asList(col1, col2)); + table.setPartitionKeys(Arrays.asList(col3)); + table.setSd(sd); + db.createTable(table); + return db.getTable(catName, dbName, tableName); } catch (Exception exception) { fail("Unable to drop and create table " + StatsUtils.getFullyQualifiedTableName(dbName, tableName) + " because " + StringUtils.stringifyException(exception)); @@ -91,9 +118,9 @@ public class TestMsckCreatePartitionsInBatches { } } - private void cleanUpTableQuietly(String dbName, String tableName) { + private void cleanUpTableQuietly(String catName, String dbName, String tableName) { try { - db.dropTable(dbName, tableName, true, true, true); + db.dropTable(catName, dbName, tableName); } catch (Exception exception) { fail("Unexpected exception: " + StringUtils.stringifyException(exception)); } @@ -119,19 +146,23 @@ public class TestMsckCreatePartitionsInBatches { public void testNumberOfCreatePartitionCalls() throws Exception { // create 10 dummy partitions Set<PartitionResult> partsNotInMs = createPartsNotInMs(10); - Hive spyDb = Mockito.spy(db); + IMetaStoreClient spyDb = Mockito.spy(db); // batch size of 5 and decaying factor of 2 - ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 5, 2, 0); + msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 5, 2, 0); // there should be 2 calls to create partitions with each batch size of 5 - ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class); - Mockito.verify(spyDb, Mockito.times(2)).createPartitions(argument.capture()); + ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class); + Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), needResultsArg.capture()); // confirm the batch sizes were 5, 5 in the two calls to create partitions - List<AddPartitionDesc> apds = argument.getAllValues(); + List<List<Partition>> apds = argParts.getAllValues(); int retryAttempt = 1; Assert.assertEquals(String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), - 5, apds.get(0).getPartitionCount()); + 5, apds.get(0).size()); Assert.assertEquals(String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), - 5, apds.get(1).getPartitionCount()); + 5, apds.get(1).size()); + assertTrue(ifNotExistsArg.getValue()); + assertFalse(needResultsArg.getValue()); } /** @@ -144,19 +175,23 @@ public class TestMsckCreatePartitionsInBatches { public void testUnevenNumberOfCreatePartitionCalls() throws Exception { // create 9 dummy partitions Set<PartitionResult> partsNotInMs = createPartsNotInMs(9); - Hive spyDb = Mockito.spy(db); + IMetaStoreClient spyDb = Mockito.spy(db); // batch size of 5 and decaying factor of 2 - ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 5, 2, 0); + msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 5, 2, 0); // there should be 2 calls to create partitions with batch sizes of 5, 4 - ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class); - Mockito.verify(spyDb, Mockito.times(2)).createPartitions(argument.capture()); + ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class); + Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), needResultsArg.capture()); // confirm the batch sizes were 5, 4 in the two calls to create partitions - List<AddPartitionDesc> apds = argument.getAllValues(); + List<List<Partition>> apds = argParts.getAllValues(); int retryAttempt = 1; Assert.assertEquals(String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), - 5, apds.get(0).getPartitionCount()); + 5, apds.get(0).size()); Assert.assertEquals(String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), - 4, apds.get(1).getPartitionCount()); + 4, apds.get(1).size()); + assertTrue(ifNotExistsArg.getValue()); + assertFalse(needResultsArg.getValue()); } /** @@ -169,14 +204,20 @@ public class TestMsckCreatePartitionsInBatches { public void testEqualNumberOfPartitions() throws Exception { // create 13 dummy partitions Set<PartitionResult> partsNotInMs = createPartsNotInMs(13); - Hive spyDb = Mockito.spy(db); + IMetaStoreClient spyDb = Mockito.spy(db); // batch size of 13 and decaying factor of 2 - ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 13, 2, 0); + msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 13, 2, 0); + // there should be 1 call to create partitions with batch sizes of 13 + ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class); // there should be 1 call to create partitions with batch sizes of 13 - ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class); - Mockito.verify(spyDb, Mockito.times(1)).createPartitions(argument.capture()); + Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), + needResultsArg.capture()); Assert.assertEquals("Unexpected number of batch size", 13, - argument.getValue().getPartitionCount()); + argParts.getValue().size()); + assertTrue(ifNotExistsArg.getValue()); + assertFalse(needResultsArg.getValue()); } /** @@ -189,15 +230,22 @@ public class TestMsckCreatePartitionsInBatches { public void testSmallNumberOfPartitions() throws Exception { // create 10 dummy partitions Set<PartitionResult> partsNotInMs = createPartsNotInMs(10); - Hive spyDb = Mockito.spy(db); + IMetaStoreClient spyDb = Mockito.spy(db); // batch size of 20 and decaying factor of 2 - ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 20, 2, 0); + msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 20, 2, 0); // there should be 1 call to create partitions with batch sizes of 10 - Mockito.verify(spyDb, Mockito.times(1)).createPartitions(Mockito.anyObject()); - ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class); - Mockito.verify(spyDb).createPartitions(argument.capture()); + Mockito.verify(spyDb, Mockito.times(1)).add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), + Mockito.anyBoolean()); + ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class); + // there should be 1 call to create partitions with batch sizes of 10 + Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), + needResultsArg.capture()); Assert.assertEquals("Unexpected number of batch size", 10, - argument.getValue().getPartitionCount()); + argParts.getValue().size()); + assertTrue(ifNotExistsArg.getValue()); + assertFalse(needResultsArg.getValue()); } /** @@ -210,28 +258,34 @@ public class TestMsckCreatePartitionsInBatches { public void testBatchingWhenException() throws Exception { // create 13 dummy partitions Set<PartitionResult> partsNotInMs = createPartsNotInMs(23); - Hive spyDb = Mockito.spy(db); + IMetaStoreClient spyDb = Mockito.spy(db); // first call to createPartitions should throw exception Mockito.doThrow(HiveException.class).doCallRealMethod().doCallRealMethod().when(spyDb) - .createPartitions(Mockito.any(AddPartitionDesc.class)); + .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), + Mockito.anyBoolean()); // test with a batch size of 30 and decaying factor of 2 - ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 0); + msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 0); // confirm the batch sizes were 23, 15, 8 in the three calls to create partitions - ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class); + ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class); // there should be 3 calls to create partitions with batch sizes of 23, 15, 8 - Mockito.verify(spyDb, Mockito.times(3)).createPartitions(argument.capture()); - List<AddPartitionDesc> apds = argument.getAllValues(); + Mockito.verify(spyDb, Mockito.times(3)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), + needResultsArg.capture()); + List<List<Partition>> apds = argParts.getAllValues(); int retryAttempt = 1; Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 23, - apds.get(0).getPartitionCount()); + apds.get(0).size()); Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 15, - apds.get(1).getPartitionCount()); + apds.get(1).size()); Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 8, - apds.get(2).getPartitionCount()); + apds.get(2).size()); + assertTrue(ifNotExistsArg.getValue()); + assertFalse(needResultsArg.getValue()); } /** @@ -244,38 +298,44 @@ public class TestMsckCreatePartitionsInBatches { @Test public void testRetriesExhaustedBatchSize() throws Exception { Set<PartitionResult> partsNotInMs = createPartsNotInMs(17); - Hive spyDb = Mockito.spy(db); + IMetaStoreClient spyDb = Mockito.spy(db); Mockito.doThrow(HiveException.class).when(spyDb) - .createPartitions(Mockito.any(AddPartitionDesc.class)); + .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), Mockito.anyBoolean()); // batch size of 5 and decaying factor of 2 Exception ex = null; try { - ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 0); + msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 0); } catch (Exception retryEx) { ex = retryEx; } - Assert.assertFalse("Exception was expected but was not thrown", ex == null); - Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryException); + assertFalse("Exception was expected but was not thrown", ex == null); + Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryUtilities.RetryException); + // there should be 5 calls to create partitions with batch sizes of 17, 15, 7, 3, 1 + ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class); // there should be 5 calls to create partitions with batch sizes of 17, 15, 7, 3, 1 - ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class); - Mockito.verify(spyDb, Mockito.times(5)).createPartitions(argument.capture()); - List<AddPartitionDesc> apds = argument.getAllValues(); + Mockito.verify(spyDb, Mockito.times(5)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), + needResultsArg.capture()); + List<List<Partition>> apds = argParts.getAllValues(); int retryAttempt = 1; Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 17, - apds.get(0).getPartitionCount()); + apds.get(0).size()); Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 15, - apds.get(1).getPartitionCount()); + apds.get(1).size()); Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 7, - apds.get(2).getPartitionCount()); + apds.get(2).size()); Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 3, - apds.get(3).getPartitionCount()); + apds.get(3).size()); Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 1, - apds.get(4).getPartitionCount()); + apds.get(4).size()); + assertTrue(ifNotExistsArg.getValue()); + assertFalse(needResultsArg.getValue()); } /** @@ -285,28 +345,32 @@ public class TestMsckCreatePartitionsInBatches { @Test public void testMaxRetriesReached() throws Exception { Set<PartitionResult> partsNotInMs = createPartsNotInMs(17); - Hive spyDb = Mockito.spy(db); + IMetaStoreClient spyDb = Mockito.spy(db); Mockito.doThrow(HiveException.class).when(spyDb) - .createPartitions(Mockito.any(AddPartitionDesc.class)); + .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), Mockito.anyBoolean()); // batch size of 5 and decaying factor of 2 Exception ex = null; try { - ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 2); + msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 2); } catch (Exception retryEx) { ex = retryEx; } - Assert.assertFalse("Exception was expected but was not thrown", ex == null); - Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryException); - ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class); - Mockito.verify(spyDb, Mockito.times(2)).createPartitions(argument.capture()); - List<AddPartitionDesc> apds = argument.getAllValues(); + assertFalse("Exception was expected but was not thrown", ex == null); + Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryUtilities.RetryException); + ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class); + Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), needResultsArg.capture()); + List<List<Partition>> apds = argParts.getAllValues(); int retryAttempt = 1; Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 17, - apds.get(0).getPartitionCount()); + apds.get(0).size()); Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 15, - apds.get(1).getPartitionCount()); + apds.get(1).size()); + assertTrue(ifNotExistsArg.getValue()); + assertFalse(needResultsArg.getValue()); } /** @@ -317,25 +381,31 @@ public class TestMsckCreatePartitionsInBatches { @Test public void testOneMaxRetries() throws Exception { Set<PartitionResult> partsNotInMs = createPartsNotInMs(17); - Hive spyDb = Mockito.spy(db); + IMetaStoreClient spyDb = Mockito.spy(db); Mockito.doThrow(HiveException.class).when(spyDb) - .createPartitions(Mockito.any(AddPartitionDesc.class)); + .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), Mockito.anyBoolean()); // batch size of 5 and decaying factor of 2 Exception ex = null; try { - ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 1); + msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 1); } catch (Exception retryEx) { ex = retryEx; } - Assert.assertFalse("Exception was expected but was not thrown", ex == null); - Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryException); + assertFalse("Exception was expected but was not thrown", ex == null); + Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryUtilities.RetryException); + // there should be 5 calls to create partitions with batch sizes of 17, 15, 7, 3, 1 + ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class); + ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class); // there should be 5 calls to create partitions with batch sizes of 17, 15, 7, 3, 1 - ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class); - Mockito.verify(spyDb, Mockito.times(1)).createPartitions(argument.capture()); - List<AddPartitionDesc> apds = argument.getAllValues(); + Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), + needResultsArg.capture()); + List<List<Partition>> apds = argParts.getAllValues(); int retryAttempt = 1; Assert.assertEquals( String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 17, - apds.get(0).getPartitionCount()); + apds.get(0).size()); + assertTrue(ifNotExistsArg.getValue()); + assertFalse(needResultsArg.getValue()); } }
