http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/rename_external_partition_location.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/rename_external_partition_location.q.out b/ql/src/test/results/clientpositive/rename_external_partition_location.q.out index 02cd814..d854887 100644 --- a/ql/src/test/results/clientpositive/rename_external_partition_location.q.out +++ b/ql/src/test/results/clientpositive/rename_external_partition_location.q.out @@ -103,6 +103,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 1 numPartitions 1 numRows 10 @@ -266,6 +267,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 1 numPartitions 1 numRows 10
http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out index b2bcd51..40b6ad7 100644 --- a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out +++ b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out @@ -345,6 +345,7 @@ LOCATION #### A masked pattern was here #### TBLPROPERTIES ( 'bucketing_version'='2', + 'discover.partitions'='true', #### A masked pattern was here #### PREHOOK: query: select * from ext_t_imported PREHOOK: type: QUERY @@ -426,6 +427,7 @@ LOCATION TBLPROPERTIES ( 'EXTERNAL'='FALSE', 'bucketing_version'='2', + 'discover.partitions'='true', 'repl.last.id'='0', #### A masked pattern was here #### PREHOOK: query: select * from ext_t_r_imported http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/show_create_table_alter.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/show_create_table_alter.q.out b/ql/src/test/results/clientpositive/show_create_table_alter.q.out index 2c75c36..9d93ee9 100644 --- a/ql/src/test/results/clientpositive/show_create_table_alter.q.out +++ b/ql/src/test/results/clientpositive/show_create_table_alter.q.out @@ -32,6 +32,7 @@ LOCATION #### A masked pattern was here #### TBLPROPERTIES ( 'bucketing_version'='2', + 'discover.partitions'='true', #### A masked pattern was here #### PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('comment'='temporary table', 'EXTERNAL'='FALSE') PREHOOK: type: ALTERTABLE_PROPERTIES @@ -67,6 +68,7 @@ LOCATION TBLPROPERTIES ( 'EXTERNAL'='FALSE', 'bucketing_version'='2', + 'discover.partitions'='true', #### A masked pattern was here #### PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('comment'='changed comment', 'EXTERNAL'='TRUE') PREHOOK: type: ALTERTABLE_PROPERTIES @@ -101,6 +103,7 @@ LOCATION #### A masked pattern was here #### TBLPROPERTIES ( 'bucketing_version'='2', + 'discover.partitions'='true', #### A masked pattern was here #### PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('SORTBUCKETCOLSPREFIX'='FALSE') PREHOOK: type: ALTERTABLE_PROPERTIES @@ -135,6 +138,7 @@ LOCATION #### A masked pattern was here #### TBLPROPERTIES ( 'bucketing_version'='2', + 'discover.partitions'='true', #### A masked pattern was here #### PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('storage_handler'='org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler') PREHOOK: type: ALTERTABLE_PROPERTIES @@ -169,6 +173,7 @@ LOCATION #### A masked pattern was here #### TBLPROPERTIES ( 'bucketing_version'='2', + 'discover.partitions'='true', #### A masked pattern was here #### PREHOOK: query: DROP TABLE tmp_showcrt1_n1 PREHOOK: type: DROPTABLE http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out b/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out index e554a18..8a56bfc 100644 --- a/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out +++ b/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out @@ -32,6 +32,7 @@ LOCATION #### A masked pattern was here #### TBLPROPERTIES ( 'bucketing_version'='2', + 'discover.partitions'='true', #### A masked pattern was here #### PREHOOK: query: DROP TABLE tmp_showcrt1_n2 PREHOOK: type: DROPTABLE http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/show_create_table_serde.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/show_create_table_serde.q.out b/ql/src/test/results/clientpositive/show_create_table_serde.q.out index 8b95c9b..a66c09a 100644 --- a/ql/src/test/results/clientpositive/show_create_table_serde.q.out +++ b/ql/src/test/results/clientpositive/show_create_table_serde.q.out @@ -174,6 +174,7 @@ LOCATION #### A masked pattern was here #### TBLPROPERTIES ( 'bucketing_version'='2', + 'discover.partitions'='true', #### A masked pattern was here #### PREHOOK: query: DROP TABLE tmp_showcrt1_n0 PREHOOK: type: DROPTABLE http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out b/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out index 2d713a8..74f8b5a 100644 --- a/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out +++ b/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out @@ -49,6 +49,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 1 totalSize 11 #### A masked pattern was here #### @@ -90,6 +91,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 1 numRows 6 rawDataSize 6 http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/stats_noscan_2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/stats_noscan_2.q.out b/ql/src/test/results/clientpositive/stats_noscan_2.q.out index 182820f..6625219 100644 --- a/ql/src/test/results/clientpositive/stats_noscan_2.q.out +++ b/ql/src/test/results/clientpositive/stats_noscan_2.q.out @@ -49,6 +49,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 1 totalSize 11 #### A masked pattern was here #### @@ -90,6 +91,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 1 numRows 6 rawDataSize 6 http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out b/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out index 2a442b4..065cd98 100644 --- a/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out +++ b/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out @@ -61,6 +61,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"adrevenue\":\"true\",\"avgtimeonsite\":\"true\",\"ccode\":\"true\",\"desturl\":\"true\",\"lcode\":\"true\",\"skeyword\":\"true\",\"sourceip\":\"true\",\"useragent\":\"true\",\"visitdate\":\"true\"}} EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 0 numRows 0 rawDataSize 0 @@ -111,6 +112,7 @@ Table Type: EXTERNAL_TABLE Table Parameters: EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 1 numRows 0 rawDataSize 0 @@ -267,6 +269,7 @@ STAGE PLANS: columns sourceip,desturl,visitdate,adrevenue,useragent,ccode,lcode,skeyword,avgtimeonsite columns.comments columns.types string:string:string:float:string:string:string:string:int + discover.partitions true field.delim | #### A masked pattern was here #### name default.uservisits_web_text_none @@ -289,6 +292,7 @@ STAGE PLANS: columns sourceip,desturl,visitdate,adrevenue,useragent,ccode,lcode,skeyword,avgtimeonsite columns.comments columns.types string:string:string:float:string:string:string:string:int + discover.partitions true field.delim | #### A masked pattern was here #### name default.uservisits_web_text_none @@ -381,6 +385,7 @@ Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"adRevenue\":\"true\",\"avgTimeOnSite\":\"true\",\"sourceIP\":\"true\"}} EXTERNAL TRUE bucketing_version 2 + discover.partitions true numFiles 1 numRows 55 rawDataSize 7005 http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java new file mode 100644 index 0000000..5287f47 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import java.util.Set; +import java.util.TreeSet; + +/** + * Result class used by the HiveMetaStoreChecker. + */ +public class CheckResult { + + // tree sets to preserve ordering in qfile tests + private Set<String> tablesNotOnFs = new TreeSet<String>(); + private Set<String> tablesNotInMs = new TreeSet<String>(); + private Set<PartitionResult> partitionsNotOnFs = new TreeSet<PartitionResult>(); + private Set<PartitionResult> partitionsNotInMs = new TreeSet<PartitionResult>(); + private Set<PartitionResult> expiredPartitions = new TreeSet<>(); + + /** + * @return a list of tables not found on the filesystem. + */ + public Set<String> getTablesNotOnFs() { + return tablesNotOnFs; + } + + /** + * @param tablesNotOnFs + * a list of tables not found on the filesystem. + */ + public void setTablesNotOnFs(Set<String> tablesNotOnFs) { + this.tablesNotOnFs = tablesNotOnFs; + } + + /** + * @return a list of tables not found in the metastore. + */ + public Set<String> getTablesNotInMs() { + return tablesNotInMs; + } + + /** + * @param tablesNotInMs + * a list of tables not found in the metastore. + */ + public void setTablesNotInMs(Set<String> tablesNotInMs) { + this.tablesNotInMs = tablesNotInMs; + } + + /** + * @return a list of partitions not found on the fs + */ + public Set<PartitionResult> getPartitionsNotOnFs() { + return partitionsNotOnFs; + } + + /** + * @param partitionsNotOnFs + * a list of partitions not found on the fs + */ + public void setPartitionsNotOnFs(Set<PartitionResult> partitionsNotOnFs) { + this.partitionsNotOnFs = partitionsNotOnFs; + } + + /** + * @return a list of partitions not found in the metastore + */ + public Set<PartitionResult> getPartitionsNotInMs() { + return partitionsNotInMs; + } + + /** + * @param partitionsNotInMs + * a list of partitions not found in the metastore + */ + public void setPartitionsNotInMs(Set<PartitionResult> partitionsNotInMs) { + this.partitionsNotInMs = partitionsNotInMs; + } + + public Set<PartitionResult> getExpiredPartitions() { + return expiredPartitions; + } + + public void setExpiredPartitions( + final Set<PartitionResult> expiredPartitions) { + this.expiredPartitions = expiredPartitions; + } + + /** + * A basic description of a partition that is missing from either the fs or + * the ms. + */ + public static class PartitionResult implements Comparable<PartitionResult> { + private String partitionName; + private String tableName; + + /** + * @return name of partition + */ + public String getPartitionName() { + return partitionName; + } + + /** + * @param partitionName + * name of partition + */ + public void setPartitionName(String partitionName) { + this.partitionName = partitionName; + } + + /** + * @return table name + */ + public String getTableName() { + return tableName; + } + + /** + * @param tableName + * table name + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + @Override + public String toString() { + return tableName + ":" + partitionName; + } + + public int compareTo(PartitionResult o) { + int ret = tableName.compareTo(o.tableName); + return ret != 0 ? ret : partitionName.compareTo(o.partitionName); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 294dfb7..ecd5996 100755 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -409,7 +409,7 @@ public class Warehouse { } } - private static String escapePathName(String path) { + public static String escapePathName(String path) { return FileUtils.escapePathName(path); } http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/MetastoreException.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/MetastoreException.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/MetastoreException.java new file mode 100644 index 0000000..ab89389 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/MetastoreException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.api; + +public class MetastoreException extends Exception { + public MetastoreException() { + super(); + } + + public MetastoreException(String message) { + super(message); + } + + public MetastoreException(Throwable cause) { + super(cause); + } + + public MetastoreException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 1d64cce..f3a78bf 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -74,6 +74,8 @@ public class MetastoreConf { @VisibleForTesting static final String RUNTIME_STATS_CLEANER_TASK_CLASS = "org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask"; + static final String PARTITION_MANAGEMENT_TASK_CLASS = + "org.apache.hadoop.hive.metastore.PartitionManagementTask"; @VisibleForTesting static final String EVENT_CLEANER_TASK_CLASS = "org.apache.hadoop.hive.metastore.events.EventCleanerTask"; @@ -651,6 +653,58 @@ public class MetastoreConf { METRICS_REPORTERS("metastore.metrics.reporters", "metastore.metrics.reporters", "json,jmx", new StringSetValidator("json", "jmx", "console", "hadoop"), "A comma separated list of metrics reporters to start"), + MSCK_PATH_VALIDATION("msck.path.validation", "hive.msck.path.validation", "throw", + new StringSetValidator("throw", "skip", "ignore"), "The approach msck should take with HDFS " + + "directories that are partition-like but contain unsupported characters. 'throw' (an " + + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + MSCK_REPAIR_BATCH_SIZE("msck.repair.batch.size", + "hive.msck.repair.batch.size", 3000, + "Batch size for the msck repair command. If the value is greater than zero,\n " + + "it will execute batch wise with the configured batch size. In case of errors while\n" + + "adding unknown partitions the batch size is automatically reduced by half in the subsequent\n" + + "retry attempt. The default value is 3000 which means it will execute in the batches of 3000."), + MSCK_REPAIR_BATCH_MAX_RETRIES("msck.repair.batch.max.retries", "hive.msck.repair.batch.max.retries", 4, + "Maximum number of retries for the msck repair command when adding unknown partitions.\n " + + "If the value is greater than zero it will retry adding unknown partitions until the maximum\n" + + "number of attempts is reached or batch size is reduced to 0, whichever is earlier.\n" + + "In each retry attempt it will reduce the batch size by a factor of 2 until it reaches zero.\n" + + "If the value is set to zero it will retry until the batch size becomes zero as described above."), + MSCK_REPAIR_ENABLE_PARTITION_RETENTION("msck.repair.enable.partition.retention", + "msck.repair.enable.partition.retention", false, + "If 'partition.retention.period' table property is set, this flag determines whether MSCK REPAIR\n" + + "command should handle partition retention. If enabled, and if a specific partition's age exceeded\n" + + "retention period the partition will be dropped along with data"), + + + // Partition management task params + PARTITION_MANAGEMENT_TASK_FREQUENCY("metastore.partition.management.task.frequency", + "metastore.partition.management.task.frequency", + 300, TimeUnit.SECONDS, "Frequency at which timer task runs to do automatic partition management for tables\n" + + "with table property 'discover.partitions'='true'. Partition management include 2 pieces. One is partition\n" + + "discovery and other is partition retention period. When 'discover.partitions'='true' is set, partition\n" + + "management will look for partitions in table location and add partitions objects for it in metastore.\n" + + "Similarly if partition object exists in metastore and partition location does not exist, partition object\n" + + "will be dropped. The second piece in partition management is retention period. When 'discover.partition'\n" + + "is set to true and if 'partition.retention.period' table property is defined, partitions that are older\n" + + "than the specified retention period will be automatically dropped from metastore along with the data."), + PARTITION_MANAGEMENT_TABLE_TYPES("metastore.partition.management.table.types", + "metastore.partition.management.table.types", "MANAGED_TABLE,EXTERNAL_TABLE", + "Comma separated list of table types to use for partition management"), + PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE("metastore.partition.management.task.thread.pool.size", + "metastore.partition.management.task.thread.pool.size", 5, + "Partition management uses thread pool on to which tasks are submitted for discovering and retaining the\n" + + "partitions. This determines the size of the thread pool."), + PARTITION_MANAGEMENT_CATALOG_NAME("metastore.partition.management.catalog.name", + "metastore.partition.management.catalog.name", "hive", + "Automatic partition management will look for tables under the specified catalog name"), + PARTITION_MANAGEMENT_DATABASE_PATTERN("metastore.partition.management.database.pattern", + "metastore.partition.management.database.pattern", "*", + "Automatic partition management will look for tables using the specified database pattern"), + PARTITION_MANAGEMENT_TABLE_PATTERN("metastore.partition.management.table.pattern", + "metastore.partition.management.table.pattern", "*", + "Automatic partition management will look for tables using the specified table pattern"), + MULTITHREADED("javax.jdo.option.Multithreaded", "javax.jdo.option.Multithreaded", true, "Set this to true if multiple threads access metastore through JDO concurrently."), MAX_OPEN_TXNS("metastore.max.open.txns", "hive.max.open.txns", 100000, @@ -799,7 +853,7 @@ public class MetastoreConf { TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always", EVENT_CLEANER_TASK_CLASS + "," + RUNTIME_STATS_CLEANER_TASK_CLASS + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," + - "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask", + "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask", "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + "or in server mode. They must implement " + METASTORE_TASK_THREAD_CLASS), @@ -808,7 +862,8 @@ public class MetastoreConf { ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," + ACID_COMPACTION_HISTORY_SERVICE_CLASS + "," + ACID_WRITE_SET_SERVICE_CLASS + "," + - MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS, + MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," + + PARTITION_MANAGEMENT_TASK_CLASS, "Command separated list of tasks that will be started in separate threads. These will be" + " started only when the metastore is running as a separate service. They must " + "implement " + METASTORE_TASK_THREAD_CLASS), http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 8fb1fa7..1d89e12 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -17,37 +17,6 @@ */ package org.apache.hadoop.hive.metastore.utils; -import org.apache.commons.beanutils.PropertyUtils; -import org.apache.hadoop.hive.metastore.api.PartitionSpec; -import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD; -import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD; -import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy; - -import com.google.common.base.Joiner; - -import org.apache.hadoop.conf.Configuration; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.metastore.ColumnType; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.EnvironmentContext; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; -import org.apache.hadoop.security.SaslRpcServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - import java.beans.PropertyDescriptor; import java.io.File; import java.net.URL; @@ -69,6 +38,30 @@ import java.util.stream.Collectors; import static java.util.regex.Pattern.compile; +import javax.annotation.Nullable; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.ColumnType; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.security.SaslRpcServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + public class MetaStoreUtils { /** A fixed date format to be used for hive partition column values. */ public static final ThreadLocal<DateFormat> PARTITION_DATE_FORMAT = http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java new file mode 100644 index 0000000..2df45f6 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getAllPartitionsOf; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getDataLocation; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartColNames; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartCols; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartition; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionName; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionSpec; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPath; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isPartitioned; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.MetastoreException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Verify that the information in the metastore matches what is on the + * filesystem. Return a CheckResult object containing lists of missing and any + * unexpected tables and partitions. + */ +public class HiveMetaStoreChecker { + + public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class); + + private final IMetaStoreClient msc; + private final Configuration conf; + private final long partitionExpirySeconds; + private final Interner<Path> pathInterner = Interners.newStrongInterner(); + + public HiveMetaStoreChecker(IMetaStoreClient msc, Configuration conf) { + this(msc, conf, -1); + } + + public HiveMetaStoreChecker(IMetaStoreClient msc, Configuration conf, long partitionExpirySeconds) { + super(); + this.msc = msc; + this.conf = conf; + this.partitionExpirySeconds = partitionExpirySeconds; + } + + public IMetaStoreClient getMsc() { + return msc; + } + + /** + * Check the metastore for inconsistencies, data missing in either the + * metastore or on the dfs. + * + * @param catName + * name of the catalog, if not specified default catalog will be used. + * @param dbName + * name of the database, if not specified the default will be used. + * @param tableName + * Table we want to run the check for. If null we'll check all the + * tables in the database. + * @param partitions + * List of partition name value pairs, if null or empty check all + * partitions + * @param result + * Fill this with the results of the check + * @throws MetastoreException + * Failed to get required information from the metastore. + * @throws IOException + * Most likely filesystem related + */ + public void checkMetastore(String catName, String dbName, String tableName, + List<? extends Map<String, String>> partitions, CheckResult result) + throws MetastoreException, IOException { + + if (dbName == null || "".equalsIgnoreCase(dbName)) { + dbName = Warehouse.DEFAULT_DATABASE_NAME; + } + + try { + if (tableName == null || "".equals(tableName)) { + // no table specified, check all tables and all partitions. + List<String> tables = getMsc().getTables(catName, dbName, ".*"); + for (String currentTableName : tables) { + checkTable(catName, dbName, currentTableName, null, result); + } + + findUnknownTables(catName, dbName, tables, result); + } else if (partitions == null || partitions.isEmpty()) { + // only one table, let's check all partitions + checkTable(catName, dbName, tableName, null, result); + } else { + // check the specified partitions + checkTable(catName, dbName, tableName, partitions, result); + } + LOG.info("Number of partitionsNotInMs=" + result.getPartitionsNotInMs() + + ", partitionsNotOnFs=" + result.getPartitionsNotOnFs() + + ", tablesNotInMs=" + result.getTablesNotInMs() + + ", tablesNotOnFs=" + result.getTablesNotOnFs() + + ", expiredPartitions=" + result.getExpiredPartitions()); + } catch (TException e) { + throw new MetastoreException(e); + } + } + + /** + * Check for table directories that aren't in the metastore. + * + * @param catName + * name of the catalog, if not specified default catalog will be used. + * @param dbName + * Name of the database + * @param tables + * List of table names + * @param result + * Add any found tables to this + * @throws IOException + * Most likely filesystem related + * @throws MetaException + * Failed to get required information from the metastore. + * @throws NoSuchObjectException + * Failed to get required information from the metastore. + * @throws TException + * Thrift communication error. + */ + void findUnknownTables(String catName, String dbName, List<String> tables, CheckResult result) + throws IOException, MetaException, TException { + + Set<Path> dbPaths = new HashSet<Path>(); + Set<String> tableNames = new HashSet<String>(tables); + + for (String tableName : tables) { + Table table = getMsc().getTable(catName, dbName, tableName); + // hack, instead figure out a way to get the db paths + String isExternal = table.getParameters().get("EXTERNAL"); + if (!"TRUE".equalsIgnoreCase(isExternal)) { + Path tablePath = getPath(table); + if (tablePath != null) { + dbPaths.add(tablePath.getParent()); + } + } + } + + for (Path dbPath : dbPaths) { + FileSystem fs = dbPath.getFileSystem(conf); + FileStatus[] statuses = fs.listStatus(dbPath, FileUtils.HIDDEN_FILES_PATH_FILTER); + for (FileStatus status : statuses) { + + if (status.isDir() && !tableNames.contains(status.getPath().getName())) { + + result.getTablesNotInMs().add(status.getPath().getName()); + } + } + } + } + + /** + * Check the metastore for inconsistencies, data missing in either the + * metastore or on the dfs. + * + * @param catName + * name of the catalog, if not specified default catalog will be used. + * @param dbName + * Name of the database + * @param tableName + * Name of the table + * @param partitions + * Partitions to check, if null or empty get all the partitions. + * @param result + * Result object + * @throws MetastoreException + * Failed to get required information from the metastore. + * @throws IOException + * Most likely filesystem related + * @throws MetaException + * Failed to get required information from the metastore. + */ + void checkTable(String catName, String dbName, String tableName, + List<? extends Map<String, String>> partitions, CheckResult result) + throws MetaException, IOException, MetastoreException { + + Table table; + + try { + table = getMsc().getTable(catName, dbName, tableName); + } catch (TException e) { + result.getTablesNotInMs().add(tableName); + return; + } + + PartitionIterable parts; + boolean findUnknownPartitions = true; + + if (isPartitioned(table)) { + if (partitions == null || partitions.isEmpty()) { + int batchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX); + if (batchSize > 0) { + parts = new PartitionIterable(getMsc(), table, batchSize); + } else { + List<Partition> loadedPartitions = getAllPartitionsOf(getMsc(), table); + parts = new PartitionIterable(loadedPartitions); + } + } else { + // we're interested in specific partitions, + // don't check for any others + findUnknownPartitions = false; + List<Partition> loadedPartitions = new ArrayList<>(); + for (Map<String, String> map : partitions) { + Partition part = getPartition(getMsc(), table, map); + if (part == null) { + CheckResult.PartitionResult pr = new CheckResult.PartitionResult(); + pr.setTableName(tableName); + pr.setPartitionName(Warehouse.makePartPath(map)); + result.getPartitionsNotInMs().add(pr); + } else { + loadedPartitions.add(part); + } + } + parts = new PartitionIterable(loadedPartitions); + } + } else { + parts = new PartitionIterable(Collections.<Partition>emptyList()); + } + + checkTable(table, parts, findUnknownPartitions, result); + } + + /** + * Check the metastore for inconsistencies, data missing in either the + * metastore or on the dfs. + * + * @param table + * Table to check + * @param parts + * Partitions to check + * @param result + * Result object + * @param findUnknownPartitions + * Should we try to find unknown partitions? + * @throws IOException + * Could not get information from filesystem + * @throws MetastoreException + * Could not create Partition object + */ + void checkTable(Table table, PartitionIterable parts, + boolean findUnknownPartitions, CheckResult result) throws IOException, + MetastoreException { + + Path tablePath = getPath(table); + if (tablePath == null) { + return; + } + FileSystem fs = tablePath.getFileSystem(conf); + if (!fs.exists(tablePath)) { + result.getTablesNotOnFs().add(table.getTableName()); + return; + } + + Set<Path> partPaths = new HashSet<Path>(); + + // check that the partition folders exist on disk + for (Partition partition : parts) { + if (partition == null) { + // most likely the user specified an invalid partition + continue; + } + Path partPath = getDataLocation(table, partition); + if (partPath == null) { + continue; + } + fs = partPath.getFileSystem(conf); + if (!fs.exists(partPath)) { + CheckResult.PartitionResult pr = new CheckResult.PartitionResult(); + pr.setPartitionName(getPartitionName(table, partition)); + pr.setTableName(partition.getTableName()); + result.getPartitionsNotOnFs().add(pr); + } + + if (partitionExpirySeconds > 0) { + long currentEpochSecs = Instant.now().getEpochSecond(); + long createdTime = partition.getCreateTime(); + long partitionAgeSeconds = currentEpochSecs - createdTime; + if (partitionAgeSeconds > partitionExpirySeconds) { + CheckResult.PartitionResult pr = new CheckResult.PartitionResult(); + pr.setPartitionName(getPartitionName(table, partition)); + pr.setTableName(partition.getTableName()); + result.getExpiredPartitions().add(pr); + if (LOG.isDebugEnabled()) { + LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(), + partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs, + partitionAgeSeconds, partitionExpirySeconds); + } + } + } + + for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) { + Path qualifiedPath = partPath.makeQualified(fs); + pathInterner.intern(qualifiedPath); + partPaths.add(qualifiedPath); + partPath = partPath.getParent(); + } + } + + if (findUnknownPartitions) { + findUnknownPartitions(table, partPaths, result); + } + } + + /** + * Find partitions on the fs that are unknown to the metastore. + * + * @param table + * Table where the partitions would be located + * @param partPaths + * Paths of the partitions the ms knows about + * @param result + * Result object + * @throws IOException + * Thrown if we fail at fetching listings from the fs. + * @throws MetastoreException + */ + void findUnknownPartitions(Table table, Set<Path> partPaths, + CheckResult result) throws IOException, MetastoreException { + + Path tablePath = getPath(table); + if (tablePath == null) { + return; + } + // now check the table folder and see if we find anything + // that isn't in the metastore + Set<Path> allPartDirs = new HashSet<Path>(); + checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(getPartColNames(table))); + // don't want the table dir + allPartDirs.remove(tablePath); + + // remove the partition paths we know about + allPartDirs.removeAll(partPaths); + + Set<String> partColNames = Sets.newHashSet(); + for(FieldSchema fSchema : getPartCols(table)) { + partColNames.add(fSchema.getName()); + } + + // we should now only have the unexpected folders left + for (Path partPath : allPartDirs) { + FileSystem fs = partPath.getFileSystem(conf); + String partitionName = getPartitionName(fs.makeQualified(tablePath), + partPath, partColNames); + LOG.debug("PartitionName: " + partitionName); + + if (partitionName != null) { + CheckResult.PartitionResult pr = new CheckResult.PartitionResult(); + pr.setPartitionName(partitionName); + pr.setTableName(table.getTableName()); + + result.getPartitionsNotInMs().add(pr); + } + } + LOG.debug("Number of partitions not in metastore : " + result.getPartitionsNotInMs().size()); + } + + /** + * Assume that depth is 2, i.e., partition columns are a and b + * tblPath/a=1 => throw exception + * tblPath/a=1/file => throw exception + * tblPath/a=1/b=2/file => return a=1/b=2 + * tblPath/a=1/b=2/c=3 => return a=1/b=2 + * tblPath/a=1/b=2/c=3/file => return a=1/b=2 + * + * @param basePath + * Start directory + * @param allDirs + * This set will contain the leaf paths at the end. + * @param partColNames + * Partition column names + * @throws IOException + * Thrown if we can't get lists from the fs. + * @throws MetastoreException + */ + + private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames) throws IOException, MetastoreException { + // Here we just reuse the THREAD_COUNT configuration for + // METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance + // The number of missing partitions discovered are later added by metastore using a + // threadpool of size METASTORE_FS_HANDLER_THREADS_COUNT. If we have different sized + // pool here the smaller sized pool of the two becomes a bottleneck + int poolSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.FS_HANDLER_THREADS_COUNT); + + ExecutorService executor; + if (poolSize <= 1) { + LOG.debug("Using single-threaded version of MSCK-GetPaths"); + executor = MoreExecutors.newDirectExecutorService(); + } else { + LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + poolSize); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build(); + executor = Executors.newFixedThreadPool(poolSize, threadFactory); + } + checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames); + + executor.shutdown(); + } + + private final class PathDepthInfoCallable implements Callable<Path> { + private final List<String> partColNames; + private final FileSystem fs; + private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths; + private final boolean throwException; + private final PathDepthInfo pd; + + private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs, + ConcurrentLinkedQueue<PathDepthInfo> basePaths) { + this.partColNames = partColNames; + this.pd = pd; + this.fs = fs; + this.pendingPaths = basePaths; + this.throwException = "throw".equals(MetastoreConf.getVar(conf, MetastoreConf.ConfVars.MSCK_PATH_VALIDATION)); + } + + @Override + public Path call() throws Exception { + return processPathDepthInfo(pd); + } + + private Path processPathDepthInfo(final PathDepthInfo pd) + throws IOException, MetastoreException { + final Path currentPath = pd.p; + final int currentDepth = pd.depth; + FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER); + // found no files under a sub-directory under table base path; it is possible that the table + // is empty and hence there are no partition sub-directories created under base path + if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) { + // since maxDepth is not yet reached, we are missing partition + // columns in currentPath + logOrThrowExceptionWithMsg( + "MSCK is missing partition columns under " + currentPath.toString()); + } else { + // found files under currentPath add them to the queue if it is a directory + for (FileStatus fileStatus : fileStatuses) { + if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) { + // found a file at depth which is less than number of partition keys + logOrThrowExceptionWithMsg( + "MSCK finds a file rather than a directory when it searches for " + + fileStatus.getPath().toString()); + } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) { + // found a sub-directory at a depth less than number of partition keys + // validate if the partition directory name matches with the corresponding + // partition colName at currentDepth + Path nextPath = fileStatus.getPath(); + String[] parts = nextPath.getName().split("="); + if (parts.length != 2) { + logOrThrowExceptionWithMsg("Invalid partition name " + nextPath); + } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) { + logOrThrowExceptionWithMsg( + "Unexpected partition key " + parts[0] + " found at " + nextPath); + } else { + // add sub-directory to the work queue if maxDepth is not yet reached + pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1)); + } + } + } + if (currentDepth == partColNames.size()) { + return currentPath; + } + } + return null; + } + + private void logOrThrowExceptionWithMsg(String msg) throws MetastoreException { + if(throwException) { + throw new MetastoreException(msg); + } else { + LOG.warn(msg); + } + } + } + + private static class PathDepthInfo { + private final Path p; + private final int depth; + PathDepthInfo(Path p, int depth) { + this.p = p; + this.depth = depth; + } + } + + private void checkPartitionDirs(final ExecutorService executor, + final Path basePath, final Set<Path> result, + final FileSystem fs, final List<String> partColNames) throws MetastoreException { + try { + Queue<Future<Path>> futures = new LinkedList<Future<Path>>(); + ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>(); + nextLevel.add(new PathDepthInfo(basePath, 0)); + //Uses level parallel implementation of a bfs. Recursive DFS implementations + //have a issue where the number of threads can run out if the number of + //nested sub-directories is more than the pool size. + //Using a two queue implementation is simpler than one queue since then we will + //have to add the complex mechanisms to let the free worker threads know when new levels are + //discovered using notify()/wait() mechanisms which can potentially lead to bugs if + //not done right + while(!nextLevel.isEmpty()) { + ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new ConcurrentLinkedQueue<>(); + //process each level in parallel + while(!nextLevel.isEmpty()) { + futures.add( + executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue))); + } + while(!futures.isEmpty()) { + Path p = futures.poll().get(); + if (p != null) { + result.add(p); + } + } + //update the nextlevel with newly discovered sub-directories from the above + nextLevel = tempQueue; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage()); + executor.shutdownNow(); + throw new MetastoreException(e.getCause()); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java new file mode 100644 index 0000000..b7ae1d8 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.MetastoreException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.apache.hadoop.hive.metastore.utils.ObjectPair; +import org.apache.hadoop.hive.metastore.utils.RetryUtilities; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + +/** + * Msck repairs table metadata specifically related to partition information to be in-sync with directories in table + * location. + */ +public class Msck { + public static final Logger LOG = LoggerFactory.getLogger(Msck.class); + public static final int separator = 9; // tabCode + private static final int terminator = 10; // newLineCode + private boolean acquireLock; + private boolean deleteData; + + private Configuration conf; + private IMetaStoreClient msc; + + public Msck(boolean acquireLock, boolean deleteData) { + this.acquireLock = acquireLock; + this.deleteData = deleteData; + } + + public Configuration getConf() { + return conf; + } + + public void setConf(final Configuration conf) { + this.conf = conf; + } + + public void init(Configuration conf) throws MetaException { + if (msc == null) { + // the only reason we are using new conf here is to override EXPRESSION_PROXY_CLASS + Configuration metastoreConf = MetastoreConf.newMetastoreConf(new Configuration(conf)); + metastoreConf.set(MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS.getVarname(), + MsckPartitionExpressionProxy.class.getCanonicalName()); + setConf(metastoreConf); + this.msc = new HiveMetaStoreClient(metastoreConf); + } + } + + /** + * MetastoreCheck, see if the data in the metastore matches what is on the + * dfs. Current version checks for tables and partitions that are either + * missing on disk on in the metastore. + * + * @param msckInfo Information about the tables and partitions we want to check for. + * @return Returns 0 when execution succeeds and above 0 if it fails. + */ + public int repair(MsckInfo msckInfo) { + CheckResult result = new CheckResult(); + List<String> repairOutput = new ArrayList<>(); + String qualifiedTableName = null; + boolean success = false; + long txnId = -1; + int ret = 0; + try { + Table table = getMsc().getTable(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName()); + if (getConf().getBoolean(MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION.getHiveName(), false)) { + msckInfo.setPartitionExpirySeconds(PartitionManagementTask.getRetentionPeriodInSeconds(table)); + LOG.info("Retention period ({}s) for partition is enabled for MSCK REPAIR..", msckInfo.getPartitionExpirySeconds()); + } + HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), msckInfo.getPartitionExpirySeconds()); + // checkMetastore call will fill in result with partitions that are present in filesystem + // and missing in metastore - accessed through getPartitionsNotInMs + // And partitions that are not present in filesystem and metadata exists in metastore - + // accessed through getPartitionNotOnFS + checker.checkMetastore(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName(), + msckInfo.getPartSpecs(), result); + Set<CheckResult.PartitionResult> partsNotInMs = result.getPartitionsNotInMs(); + Set<CheckResult.PartitionResult> partsNotInFs = result.getPartitionsNotOnFs(); + Set<CheckResult.PartitionResult> expiredPartitions = result.getExpiredPartitions(); + int totalPartsToFix = partsNotInMs.size() + partsNotInFs.size() + expiredPartitions.size(); + // if nothing changed to partitions and if we are not repairing (add or drop) don't acquire for lock unnecessarily + boolean lockRequired = totalPartsToFix > 0 && + msckInfo.isRepairPartitions() && + (msckInfo.isAddPartitions() || msckInfo.isDropPartitions()); + LOG.info("#partsNotInMs: {} #partsNotInFs: {} #expiredPartitions: {} lockRequired: {} (R: {} A: {} D: {})", + partsNotInMs.size(), partsNotInFs.size(), expiredPartitions.size(), lockRequired, + msckInfo.isRepairPartitions(), msckInfo.isAddPartitions(), msckInfo.isDropPartitions()); + + if (msckInfo.isRepairPartitions()) { + // Repair metadata in HMS + qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); + long lockId; + if (acquireLock && lockRequired && table.getParameters() != null && + MetaStoreServerUtils.isTransactionalTable(table.getParameters())) { + // Running MSCK from beeline/cli will make DDL task acquire X lock when repair is enabled, since we are directly + // invoking msck.repair() without SQL statement, we need to do the same and acquire X lock (repair is default) + LockRequest lockRequest = createLockRequest(msckInfo.getDbName(), msckInfo.getTableName()); + txnId = lockRequest.getTxnid(); + try { + LockResponse res = getMsc().lock(lockRequest); + if (res.getState() != LockState.ACQUIRED) { + throw new MetastoreException("Unable to acquire lock(X) on " + qualifiedTableName); + } + lockId = res.getLockid(); + } catch (TException e) { + throw new MetastoreException("Unable to acquire lock(X) on " + qualifiedTableName, e); + } + LOG.info("Acquired lock(X) on {}. LockId: {}", qualifiedTableName, lockId); + } + int maxRetries = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_MAX_RETRIES); + int decayingFactor = 2; + + if (msckInfo.isAddPartitions() && !partsNotInMs.isEmpty()) { + // MSCK called to add missing paritions into metastore and there are + // missing partitions. + + int batchSize = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_SIZE); + if (batchSize == 0) { + //batching is not enabled. Try to add all the partitions in one call + batchSize = partsNotInMs.size(); + } + + AbstractList<String> vals = null; + String settingStr = MetastoreConf.getVar(getConf(), MetastoreConf.ConfVars.MSCK_PATH_VALIDATION); + boolean doValidate = !("ignore".equals(settingStr)); + boolean doSkip = doValidate && "skip".equals(settingStr); + // The default setting is "throw"; assume doValidate && !doSkip means throw. + if (doValidate) { + // Validate that we can add partition without escaping. Escaping was originally intended + // to avoid creating invalid HDFS paths; however, if we escape the HDFS path (that we + // deem invalid but HDFS actually supports - it is possible to create HDFS paths with + // unprintable characters like ASCII 7), metastore will create another directory instead + // of the one we are trying to "repair" here. + Iterator<CheckResult.PartitionResult> iter = partsNotInMs.iterator(); + while (iter.hasNext()) { + CheckResult.PartitionResult part = iter.next(); + try { + vals = Warehouse.makeValsFromName(part.getPartitionName(), vals); + } catch (MetaException ex) { + throw new MetastoreException(ex); + } + for (String val : vals) { + String escapedPath = FileUtils.escapePathName(val); + assert escapedPath != null; + if (escapedPath.equals(val)) { + continue; + } + String errorMsg = "Repair: Cannot add partition " + msckInfo.getTableName() + ':' + + part.getPartitionName() + " due to invalid characters in the name"; + if (doSkip) { + repairOutput.add(errorMsg); + iter.remove(); + } else { + throw new MetastoreException(errorMsg); + } + } + } + } + try { + createPartitionsInBatches(getMsc(), repairOutput, partsNotInMs, table, batchSize, + decayingFactor, maxRetries); + } catch (Exception e) { + throw new MetastoreException(e); + } + } + + if (msckInfo.isDropPartitions() && (!partsNotInFs.isEmpty() || !expiredPartitions.isEmpty())) { + // MSCK called to drop stale paritions from metastore and there are + // stale partitions. + + int batchSize = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_SIZE); + if (batchSize == 0) { + //batching is not enabled. Try to drop all the partitions in one call + batchSize = partsNotInFs.size() + expiredPartitions.size(); + } + + try { + dropPartitionsInBatches(getMsc(), repairOutput, partsNotInFs, expiredPartitions, table, batchSize, + decayingFactor, maxRetries); + } catch (Exception e) { + throw new MetastoreException(e); + } + } + } + success = true; + } catch (Exception e) { + LOG.warn("Failed to run metacheck: ", e); + success = false; + ret = 1; + } finally { + if (msckInfo.getResFile() != null) { + BufferedWriter resultOut = null; + try { + Path resFile = new Path(msckInfo.getResFile()); + FileSystem fs = resFile.getFileSystem(getConf()); + resultOut = new BufferedWriter(new OutputStreamWriter(fs.create(resFile))); + + boolean firstWritten = false; + firstWritten |= writeMsckResult(result.getTablesNotInMs(), + "Tables not in metastore:", resultOut, firstWritten); + firstWritten |= writeMsckResult(result.getTablesNotOnFs(), + "Tables missing on filesystem:", resultOut, firstWritten); + firstWritten |= writeMsckResult(result.getPartitionsNotInMs(), + "Partitions not in metastore:", resultOut, firstWritten); + firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(), + "Partitions missing from filesystem:", resultOut, firstWritten); + firstWritten |= writeMsckResult(result.getExpiredPartitions(), + "Expired partitions (retention period: " + msckInfo.getPartitionExpirySeconds() + "s) :", resultOut, firstWritten); + // sorting to stabilize qfile output (msck_repair_drop.q) + Collections.sort(repairOutput); + for (String rout : repairOutput) { + if (firstWritten) { + resultOut.write(terminator); + } else { + firstWritten = true; + } + resultOut.write(rout); + } + } catch (IOException e) { + LOG.warn("Failed to save metacheck output: ", e); + ret = 1; + } finally { + if (resultOut != null) { + try { + resultOut.close(); + } catch (IOException e) { + LOG.warn("Failed to close output file: ", e); + ret = 1; + } + } + } + } + + LOG.info("Tables not in metastore: {}", result.getTablesNotInMs()); + LOG.info("Tables missing on filesystem: {}", result.getTablesNotOnFs()); + LOG.info("Partitions not in metastore: {}", result.getPartitionsNotInMs()); + LOG.info("Partitions missing from filesystem: {}", result.getPartitionsNotOnFs()); + LOG.info("Expired partitions: {}", result.getExpiredPartitions()); + if (acquireLock && txnId > 0) { + if (success) { + try { + LOG.info("txnId: {} succeeded. Committing..", txnId); + getMsc().commitTxn(txnId); + } catch (Exception e) { + LOG.warn("Error while committing txnId: {} for table: {}", txnId, qualifiedTableName, e); + ret = 1; + } + } else { + try { + LOG.info("txnId: {} failed. Aborting..", txnId); + getMsc().abortTxns(Lists.newArrayList(txnId)); + } catch (Exception e) { + LOG.warn("Error while aborting txnId: {} for table: {}", txnId, qualifiedTableName, e); + ret = 1; + } + } + } + if (getMsc() != null) { + getMsc().close(); + msc = null; + } + } + + return ret; + } + + private LockRequest createLockRequest(final String dbName, final String tableName) throws TException { + UserGroupInformation loggedInUser = null; + String username; + try { + loggedInUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage()); + } + if (loggedInUser == null) { + username = System.getProperty("user.name"); + } else { + username = loggedInUser.getShortUserName(); + } + long txnId = getMsc().openTxn(username); + String agentInfo = Thread.currentThread().getName(); + LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); + requestBuilder.setUser(username); + requestBuilder.setTransactionId(txnId); + + LockComponentBuilder lockCompBuilder = new LockComponentBuilder() + .setDbName(dbName) + .setTableName(tableName) + .setIsTransactional(true) + .setExclusive() + // WriteType is DDL_EXCLUSIVE for MSCK REPAIR so we need NO_TXN. Refer AcidUtils.makeLockComponents + .setOperationType(DataOperationType.NO_TXN); + requestBuilder.addLockComponent(lockCompBuilder.build()); + + LOG.info("Created lock(X) request with info - user: {} txnId: {} agentInfo: {} dbName: {} tableName: {}", + username, txnId, agentInfo, dbName, tableName); + return requestBuilder.build(); + } + + public IMetaStoreClient getMsc() { + return msc; + } + + @VisibleForTesting + public void createPartitionsInBatches(final IMetaStoreClient metastoreClient, List<String> repairOutput, + Set<CheckResult.PartitionResult> partsNotInMs, Table table, int batchSize, int decayingFactor, int maxRetries) + throws Exception { + String addMsgFormat = "Repair: Added partition to metastore " + + table.getTableName() + ":%s"; + Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInMs); + new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) { + @Override + public Void execute(int size) throws MetastoreException { + try { + while (!batchWork.isEmpty()) { + List<Partition> partsToAdd = new ArrayList<>(); + //get the current batch size + int currentBatchSize = size; + //store the partitions temporarily until processed + List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize); + List<String> addMsgs = new ArrayList<>(currentBatchSize); + //add the number of partitions given by the current batchsize + for (CheckResult.PartitionResult part : batchWork) { + if (currentBatchSize == 0) { + break; + } + Path tablePath = MetaStoreServerUtils.getPath(table); + if (tablePath == null) { + continue; + } + Map<String, String> partSpec = Warehouse.makeSpecFromName(part.getPartitionName()); + Path location = new Path(tablePath, Warehouse.makePartPath(partSpec)); + Partition partition = MetaStoreServerUtils.createMetaPartitionObject(table, partSpec, location); + partition.setWriteId(table.getWriteId()); + partsToAdd.add(partition); + lastBatch.add(part); + addMsgs.add(String.format(addMsgFormat, part.getPartitionName())); + currentBatchSize--; + } + metastoreClient.add_partitions(partsToAdd, true, false); + // if last batch is successful remove it from partsNotInMs + batchWork.removeAll(lastBatch); + repairOutput.addAll(addMsgs); + } + return null; + } catch (TException e) { + throw new MetastoreException(e); + } + } + }.run(); + } + + private static String makePartExpr(Map<String, String> spec) + throws MetaException { + StringBuilder suffixBuf = new StringBuilder(); + int i = 0; + for (Map.Entry<String, String> e : spec.entrySet()) { + if (e.getValue() == null || e.getValue().length() == 0) { + throw new MetaException("Partition spec is incorrect. " + spec); + } + if (i > 0) { + suffixBuf.append(" AND "); + } + suffixBuf.append(Warehouse.escapePathName(e.getKey())); + suffixBuf.append('='); + suffixBuf.append("'").append(Warehouse.escapePathName(e.getValue())).append("'"); + i++; + } + return suffixBuf.toString(); + } + + // Drops partitions in batches. partNotInFs is split into batches based on batchSize + // and dropped. The dropping will be through RetryUtilities which will retry when there is a + // failure after reducing the batchSize by decayingFactor. Retrying will cease when maxRetries + // limit is reached or batchSize reduces to 0, whichever comes earlier. + @VisibleForTesting + public void dropPartitionsInBatches(final IMetaStoreClient metastoreClient, List<String> repairOutput, + Set<CheckResult.PartitionResult> partsNotInFs, Set<CheckResult.PartitionResult> expiredPartitions, + Table table, int batchSize, int decayingFactor, int maxRetries) throws Exception { + String dropMsgFormat = + "Repair: Dropped partition from metastore " + Warehouse.getCatalogQualifiedTableName(table) + ":%s"; + // Copy of partitions that will be split into batches + Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInFs); + if (expiredPartitions != null && !expiredPartitions.isEmpty()) { + batchWork.addAll(expiredPartitions); + } + PartitionDropOptions dropOptions = new PartitionDropOptions().deleteData(deleteData).ifExists(true); + new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) { + @Override + public Void execute(int size) throws MetastoreException { + try { + while (!batchWork.isEmpty()) { + int currentBatchSize = size; + + // to store the partitions that are currently being processed + List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize); + + // drop messages for the dropped partitions + List<String> dropMsgs = new ArrayList<>(currentBatchSize); + + // Partitions to be dropped + List<String> dropParts = new ArrayList<>(currentBatchSize); + + for (CheckResult.PartitionResult part : batchWork) { + // This batch is full: break out of for loop to execute + if (currentBatchSize == 0) { + break; + } + + dropParts.add(part.getPartitionName()); + + // Add the part to lastBatch to track the parition being dropped + lastBatch.add(part); + + // Update messages + dropMsgs.add(String.format(dropMsgFormat, part.getPartitionName())); + + // Decrement batch size. When this gets to 0, the batch will be executed + currentBatchSize--; + } + + // this call is deleting partitions that are already missing from filesystem + // so 3rd parameter (deleteData) is set to false + // msck is doing a clean up of hms. if for some reason the partition is already + // deleted, then it is good. So, the last parameter ifexists is set to true + List<ObjectPair<Integer, byte[]>> partExprs = getPartitionExpr(dropParts); + metastoreClient.dropPartitions(table.getCatName(), table.getDbName(), table.getTableName(), partExprs, dropOptions); + + // if last batch is successful remove it from partsNotInFs + batchWork.removeAll(lastBatch); + repairOutput.addAll(dropMsgs); + } + return null; + } catch (TException e) { + throw new MetastoreException(e); + } + } + + private List<ObjectPair<Integer, byte[]>> getPartitionExpr(final List<String> parts) throws MetaException { + List<ObjectPair<Integer, byte[]>> expr = new ArrayList<>(parts.size()); + for (int i = 0; i < parts.size(); i++) { + String partName = parts.get(i); + Map<String, String> partSpec = Warehouse.makeSpecFromName(partName); + String partExpr = makePartExpr(partSpec); + if (LOG.isDebugEnabled()) { + LOG.debug("Generated partExpr: {} for partName: {}", partExpr, partName); + } + expr.add(new ObjectPair<>(i, partExpr.getBytes(StandardCharsets.UTF_8))); + } + return expr; + } + }.run(); + } + + /** + * Write the result of msck to a writer. + * + * @param result The result we're going to write + * @param msg Message to write. + * @param out Writer to write to + * @param wrote if any previous call wrote data + * @return true if something was written + * @throws IOException In case the writing fails + */ + private boolean writeMsckResult(Set<?> result, String msg, + Writer out, boolean wrote) throws IOException { + + if (!result.isEmpty()) { + if (wrote) { + out.write(terminator); + } + + out.write(msg); + for (Object entry : result) { + out.write(separator); + out.write(entry.toString()); + } + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java new file mode 100644 index 0000000..81bcb56 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import java.util.ArrayList; +import java.util.LinkedHashMap; + +/** + * Metadata related to Msck. + */ +public class MsckInfo { + + private String catalogName; + private String dbName; + private String tableName; + private ArrayList<LinkedHashMap<String, String>> partSpecs; + private String resFile; + private boolean repairPartitions; + private boolean addPartitions; + private boolean dropPartitions; + private long partitionExpirySeconds; + + public MsckInfo(final String catalogName, final String dbName, final String tableName, + final ArrayList<LinkedHashMap<String, String>> partSpecs, final String resFile, final boolean repairPartitions, + final boolean addPartitions, + final boolean dropPartitions, + final long partitionExpirySeconds) { + this.catalogName = catalogName; + this.dbName = dbName; + this.tableName = tableName; + this.partSpecs = partSpecs; + this.resFile = resFile; + this.repairPartitions = repairPartitions; + this.addPartitions = addPartitions; + this.dropPartitions = dropPartitions; + this.partitionExpirySeconds = partitionExpirySeconds; + } + + public String getCatalogName() { + return catalogName; + } + + public void setCatalogName(final String catalogName) { + this.catalogName = catalogName; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(final String dbName) { + this.dbName = dbName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(final String tableName) { + this.tableName = tableName; + } + + public ArrayList<LinkedHashMap<String, String>> getPartSpecs() { + return partSpecs; + } + + public void setPartSpecs(final ArrayList<LinkedHashMap<String, String>> partSpecs) { + this.partSpecs = partSpecs; + } + + public String getResFile() { + return resFile; + } + + public void setResFile(final String resFile) { + this.resFile = resFile; + } + + public boolean isRepairPartitions() { + return repairPartitions; + } + + public void setRepairPartitions(final boolean repairPartitions) { + this.repairPartitions = repairPartitions; + } + + public boolean isAddPartitions() { + return addPartitions; + } + + public void setAddPartitions(final boolean addPartitions) { + this.addPartitions = addPartitions; + } + + public boolean isDropPartitions() { + return dropPartitions; + } + + public void setDropPartitions(final boolean dropPartitions) { + this.dropPartitions = dropPartitions; + } + + public long getPartitionExpirySeconds() { + return partitionExpirySeconds; + } + + public void setPartitionExpirySeconds(final long partitionExpirySeconds) { + this.partitionExpirySeconds = partitionExpirySeconds; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java new file mode 100644 index 0000000..d842825 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java @@ -0,0 +1,64 @@ +package org.apache.hadoop.hive.metastore; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; + +// This is added as part of moving MSCK code from ql to standalone-metastore. There is a metastore API to drop +// partitions by name but we cannot use it because msck typically will contain partition value (year=2014). We almost +// never drop partition by name (year). So we need to construct expression filters, the current +// PartitionExpressionProxy implementations (PartitionExpressionForMetastore and HCatClientHMSImpl.ExpressionBuilder) +// all depend on ql code to build ExprNodeDesc for the partition expressions. It also depends on kryo for serializing +// the expression objects to byte[]. For MSCK drop partition, we don't need complex expression generator. For now, +// all we do is split the partition spec (year=2014/month=24) into filter expression year='2014' and month='24' and +// rely on metastore database to deal with type conversions. Ideally, PartitionExpressionProxy default implementation +// should use SearchArgument (storage-api) to construct the filter expression and not depend on ql, but the usecase +// for msck is pretty simple and this specific implementation should suffice. +public class MsckPartitionExpressionProxy implements PartitionExpressionProxy { + @Override + public String convertExprToFilter(final byte[] exprBytes, final String defaultPartitionName) throws MetaException { + return new String(exprBytes, StandardCharsets.UTF_8); + } + + @Override + public boolean filterPartitionsByExpr(List<FieldSchema> partColumns, byte[] expr, String + defaultPartitionName, List<String> partitionNames) throws MetaException { + return false; + } + + @Override + public FileMetadataExprType getMetadataType(String inputFormat) { + throw new UnsupportedOperationException(); + } + + @Override + public FileFormatProxy getFileFormatProxy(FileMetadataExprType type) { + throw new UnsupportedOperationException(); + } + + @Override + public SearchArgument createSarg(byte[] expr) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 9c15804..0755483 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -686,10 +686,9 @@ public class ObjectStore implements RawStore, Configurable { debugLog("rolling back transaction: no open transactions: " + openTrasactionCalls); return; } - debugLog("Rollback transaction, isActive: " + currentTransaction.isActive()); + debugLog("Rollback transaction, isActive: " + isActiveTransaction()); try { - if (currentTransaction.isActive() - && transactionStatus != TXN_STATUS.ROLLBACK) { + if (isActiveTransaction() && transactionStatus != TXN_STATUS.ROLLBACK) { currentTransaction.rollback(); } } finally { @@ -1711,6 +1710,7 @@ public class ObjectStore implements RawStore, Configurable { for (MTable table : tables) { TableMeta metaData = new TableMeta( table.getDatabase().getName(), table.getTableName(), table.getTableType()); + metaData.setCatName(catName); metaData.setComments(table.getParameters().get("comment")); metas.add(metaData); }