Repository: hive Updated Branches: refs/heads/master ef2db2122 -> fe81a3760
HIVE-12897 : Improve dynamic partition loading (Ashutosh Chauhan via Prasanth J) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fe81a376 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fe81a376 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fe81a376 Branch: refs/heads/master Commit: fe81a3760a87f3d8e9aa32dd51cfbb948e4f793a Parents: ef2db21 Author: Ashutosh Chauhan <[email protected]> Authored: Thu Jan 21 14:07:19 2016 -0800 Committer: Ashutosh Chauhan <[email protected]> Committed: Wed Jan 27 08:58:34 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 17 +++++++++-------- .../hadoop/hive/metastore/MetaStoreUtils.java | 2 +- .../hadoop/hive/metastore/ObjectStore.java | 7 +++++-- .../hadoop/hive/ql/exec/FileSinkOperator.java | 8 ++++++++ .../apache/hadoop/hive/ql/metadata/Hive.java | 14 ++++---------- .../index/RewriteParseContextGenerator.java | 4 ++-- .../parse/ExplainSQRewriteSemanticAnalyzer.java | 2 -- .../hive/ql/parse/ExplainSemanticAnalyzer.java | 2 +- .../hive/ql/plan/DynamicPartitionCtx.java | 20 +++++++++++++++++++- .../dynamic_partitions_with_whitelist.q.out | 4 ++-- 10 files changed, 51 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 97fe7bc..74a8749 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -210,7 +210,8 @@ public class HiveConf extends Configuration { public static final HiveConf.ConfVars[] metaConfVars = { HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL, HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL_DDL, - HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT + HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, + HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN }; static { @@ -1574,32 +1575,32 @@ public class HiveConf extends Configuration { HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000, "Number of aborted transactions involving a given table or partition that will trigger\n" + "a major compaction."), - + COMPACTOR_INITIATOR_FAILED_THRESHOLD("hive.compactor.initiator.failed.compacts.threshold", 2, new RangeValidator(1, 20), "Number of consecutive compaction failures (per table/partition) " + "after which automatic compactions will not be scheduled any more. Note that this must be less " + "than hive.compactor.history.retention.failed."), - + HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms", new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"), COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), - + COMPACTOR_HISTORY_RETENTION_SUCCEEDED("hive.compactor.history.retention.succeeded", 3, new RangeValidator(0, 100), "Determines how many successful compaction records will be " + "retained in compaction history for a given table/partition."), - + COMPACTOR_HISTORY_RETENTION_FAILED("hive.compactor.history.retention.failed", 3, new RangeValidator(0, 100), "Determines how many failed compaction records will be " + "retained in compaction history for a given table/partition."), - + COMPACTOR_HISTORY_RETENTION_ATTEMPTED("hive.compactor.history.retention.attempted", 2, new RangeValidator(0, 100), "Determines how many attempted compaction records will be " + "retained in compaction history for a given table/partition."), - + COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m", new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"), - + HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s", new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"), HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s", http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index eee7f1b..c8859f3 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1618,7 +1618,7 @@ public class MetaStoreUtils { } - private static String getPartitionValWithInvalidCharacter(List<String> partVals, + public static String getPartitionValWithInvalidCharacter(List<String> partVals, Pattern partitionValidationPattern) { if (partitionValidationPattern == null) { return null; http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index e044c73..b808728 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -290,7 +290,7 @@ public class ObjectStore implements RawStore, Configurable { String partitionValidationRegex = hiveConf.get(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.name()); - if (partitionValidationRegex != null && partitionValidationRegex.equals("")) { + if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { partitionValidationPattern = Pattern.compile(partitionValidationRegex); } else { partitionValidationPattern = null; @@ -759,7 +759,7 @@ public class ObjectStore implements RawStore, Configurable { String queryStr = "select name from org.apache.hadoop.hive.metastore.model.MDatabase"; Query query = null; - + openTransaction(); try { query = pm.newQuery(queryStr); @@ -1054,14 +1054,17 @@ public class ObjectStore implements RawStore, Configurable { return tbls; } + @Override public int getDatabaseCount() throws MetaException { return getObjectCount("name", MDatabase.class.getName()); } + @Override public int getPartitionCount() throws MetaException { return getObjectCount("partitionName", MPartition.class.getName()); } + @Override public int getTableCount() throws MetaException { return getObjectCount("tableName", MTable.class.getName()); } http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 3289cfc..14121b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -696,6 +697,13 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } + String invalidPartitionVal; + if((invalidPartitionVal = MetaStoreUtils.getPartitionValWithInvalidCharacter(dpVals, dpCtx.getWhiteListPattern()))!=null) { + throw new HiveFatalException("Partition value '" + invalidPartitionVal + + "' contains a character not matched by whitelist pattern '" + + dpCtx.getWhiteListPattern().toString() + "'. " + "(configure with " + + HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname + ")"); + } fpaths = getDynOutPaths(dpVals, lbDirName); // use SubStructObjectInspector to serialize the non-partitioning columns in the input row http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index efb50b2..50681c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1427,7 +1427,7 @@ public class Hive { * @param isSrcLocal * If the source directory is LOCAL * @param isAcid true if this is an ACID operation - * @throws JSONException + * @throws JSONException */ public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec, boolean replace, @@ -1622,7 +1622,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param txnId txnId, can be 0 unless isAcid == true * @return partition map details (PartitionSpec and Partition) * @throws HiveException - * @throws JSONException + * @throws JSONException */ public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath, String tableName, Map<String, String> partSpec, boolean replace, @@ -1635,16 +1635,10 @@ private void constructOneLBLocationMap(FileStatus fSta, LinkedHashMap<Map<String, String>, Partition>(); FileSystem fs = loadPath.getFileSystem(conf); - FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs); + FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); // Check for empty partitions for (FileStatus s : leafStatus) { - try { - validatePartitionNameCharacters( - Warehouse.getPartValuesFromPartName(s.getPath().getParent().toString())); - } catch (MetaException e) { - throw new HiveException(e); - } - validPartitions.add(s.getPath().getParent()); + validPartitions.add(s.getPath()); } int partsToLoad = validPartitions.size(); http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java index 48105de..64f9734 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java @@ -102,9 +102,9 @@ public final class RewriteParseContextGenerator { ASTNode ast, Context ctx) throws SemanticException { QB qb = new QB(null, null, false); ASTNode child = ast; - ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + ParseContext subPCtx = sem.getParseContext(); subPCtx.setContext(ctx); - ((SemanticAnalyzer) sem).initParseCtx(subPCtx); + sem.initParseCtx(subPCtx); LOG.info("Starting Sub-query Semantic Analysis"); sem.doPhase1(child, qb, sem.initPhase1Ctx(), null); http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java index 2c2339a..6f0f3a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java @@ -17,13 +17,11 @@ */ package org.apache.hadoop.hive.ql.parse; -import java.io.Serializable; import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ExplainSQRewriteTask; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork; http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index c1e9ec1..e1e3eb2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -79,7 +79,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer { if (tasks == null) { tasks = Collections.emptyList(); } - + FetchTask fetchTask = sem.getFetchTask(); if (fetchTask != null) { // Initialize fetch work such that operator tree will be constructed. http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 95d5635..e6ec3ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -21,10 +21,15 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.SemanticException; public class DynamicPartitionCtx implements Serializable { @@ -44,12 +49,13 @@ public class DynamicPartitionCtx implements Serializable { private List<String> dpNames; // dp column names private String defaultPartName; // default partition name in case of null or empty value private int maxPartsPerNode; // maximum dynamic partitions created per mapper/reducer + private Pattern whiteListPattern; public DynamicPartitionCtx() { } public DynamicPartitionCtx(Table tbl, Map<String, String> partSpec, String defaultPartName, - int maxParts) { + int maxParts) throws SemanticException { this.partSpec = partSpec; this.spNames = new ArrayList<String>(); this.dpNames = new ArrayList<String>(); @@ -71,6 +77,13 @@ public class DynamicPartitionCtx implements Serializable { } else { this.spPath = null; } + String confVal; + try { + confVal = Hive.get().getMetaConf(ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname); + } catch (HiveException e) { + throw new SemanticException(e); + } + this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); } public DynamicPartitionCtx(DynamicPartitionCtx dp) { @@ -84,6 +97,11 @@ public class DynamicPartitionCtx implements Serializable { this.dpNames = dp.dpNames; this.defaultPartName = dp.defaultPartName; this.maxPartsPerNode = dp.maxPartsPerNode; + this.whiteListPattern = dp.whiteListPattern; + } + + public Pattern getWhiteListPattern() { + return whiteListPattern; } public int getMaxPartitionsPerNode() { http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out b/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out index f069ae8..654d892 100644 --- a/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out +++ b/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out @@ -32,5 +32,5 @@ PREHOOK: type: QUERY PREHOOK: Input: default@source_table PREHOOK: Input: default@source_table@ds=2008-04-08/hr=11 PREHOOK: Output: default@dest_table -Failed with exception MetaException(message:Partition value 'val_129' contains a character not matched by whitelist pattern '[^9]*'. (configure with hive.metastore.partition.name.whitelist.pattern)) -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask +#### A masked pattern was here #### +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
