HIVE-17652: retire ANALYZE TABLE ... PARTIALSCAN (Zoltan Haindrich, reviewed by Ashutosh Chauhan)
Signed-off-by: Zoltan Haindrich <k...@rxd.hu> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/71004d2e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/71004d2e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/71004d2e Branch: refs/heads/master Commit: 71004d2e1ffb7974ba7cdf7a73bd843ccf4962ba Parents: 2a5e72c Author: Zoltan Haindrich <k...@rxd.hu> Authored: Fri Oct 6 11:00:37 2017 +0200 Committer: Zoltan Haindrich <k...@rxd.hu> Committed: Fri Oct 6 11:08:38 2017 +0200 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 22 +- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 35 +- .../apache/hadoop/hive/ql/QueryProperties.java | 13 +- .../hadoop/hive/ql/exec/StatsNoJobTask.java | 2 +- .../apache/hadoop/hive/ql/exec/TaskFactory.java | 4 - .../apache/hadoop/hive/ql/exec/Utilities.java | 14 +- .../ql/io/rcfile/stats/PartialScanMapper.java | 180 --------- .../ql/io/rcfile/stats/PartialScanTask.java | 381 ------------------- .../ql/io/rcfile/stats/PartialScanWork.java | 115 ------ .../hive/ql/optimizer/GenMRTableScan1.java | 56 --- .../hive/ql/optimizer/GenMapRedUtils.java | 31 -- .../ql/optimizer/QueryPlanPostProcessor.java | 1 - .../ql/parse/ColumnStatsSemanticAnalyzer.java | 2 - .../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 1 - .../apache/hadoop/hive/ql/parse/HiveParser.g | 4 +- .../hive/ql/parse/ProcessAnalyzeTable.java | 48 --- .../hadoop/hive/ql/parse/QBParseInfo.java | 16 - .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 156 ++------ .../parse/spark/SparkProcessAnalyzeTable.java | 49 +-- .../apache/hadoop/hive/ql/plan/StatsWork.java | 16 - .../stats_partialscan_autogether.q | 32 -- .../stats_partialscan_non_external.q | 5 - .../stats_partialscan_non_native.q | 6 - .../clientnegative/stats_partscan_norcfile.q | 12 - .../test/queries/clientpositive/orc_analyze.q | 14 - .../queries/clientpositive/stats_partscan_1.q | 37 -- .../clientpositive/stats_partscan_1_23.q | 38 -- .../stats_partialscan_autogether.q.out | 75 ---- .../stats_partialscan_non_external.q.out | 9 - .../stats_partialscan_non_native.q.out | 11 - .../stats_partscan_norcfile.q.out | 36 -- .../clientpositive/llap/orc_analyze.q.out | 288 -------------- .../spark/stats_partscan_1_23.q.out | 181 --------- .../clientpositive/stats_partscan_1.q.out | 215 ----------- .../clientpositive/stats_partscan_1_23.q.out | 184 --------- 35 files changed, 81 insertions(+), 2208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c95844b..d2afc2c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1722,7 +1722,7 @@ public class HiveConf extends Configuration { "analyze table T compute statistics for columns. Queries like these should compute partition" + "level stats for partitioned table even when no part spec is specified."), HIVE_STATS_GATHER_NUM_THREADS("hive.stats.gather.num.threads", 10, - "Number of threads used by partialscan/noscan analyze command for partitioned tables.\n" + + "Number of threads used by noscan analyze command for partitioned tables.\n" + "This is applicable only for file formats that implement StatsProvidingRecordReader (like ORC)."), // Collect table access keys information for operators that can benefit from bucketing HIVE_STATS_COLLECT_TABLEKEYS("hive.stats.collect.tablekeys", false, @@ -4119,10 +4119,14 @@ public class HiveConf extends Configuration { public static String[] getTrimmedStringsVar(Configuration conf, ConfVars var) { assert (var.valClass == String.class) : var.varname; String[] result = conf.getTrimmedStrings(var.varname, (String[])null); - if (result != null) return result; + if (result != null) { + return result; + } if (var.altName != null) { result = conf.getTrimmedStrings(var.altName, (String[])null); - if (result != null) return result; + if (result != null) { + return result; + } } return org.apache.hadoop.util.StringUtils.getTrimmedStrings(var.defaultStrVal); } @@ -4824,7 +4828,9 @@ public class HiveConf extends Configuration { public static String getNonMrEngines() { String result = ""; for (String s : ConfVars.HIVE_EXECUTION_ENGINE.getValidStringValues()) { - if ("mr".equals(s)) continue; + if ("mr".equals(s)) { + continue; + } if (!result.isEmpty()) { result += ", "; } @@ -4845,7 +4851,9 @@ public class HiveConf extends Configuration { public static HashMap<String, ConfVars> getOrCreateReverseMap() { // This should be called rarely enough; for now it's ok to just lock every time. synchronized (reverseMapLock) { - if (reverseMap != null) return reverseMap; + if (reverseMap != null) { + return reverseMap; + } } HashMap<String, ConfVars> vars = new HashMap<>(); for (ConfVars val : ConfVars.values()) { @@ -4855,7 +4863,9 @@ public class HiveConf extends Configuration { } } synchronized (reverseMapLock) { - if (reverseMap != null) return reverseMap; + if (reverseMap != null) { + return reverseMap; + } reverseMap = vars; return reverseMap; } http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 6da8304..4b6bae1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -327,19 +327,8 @@ public enum ErrorMsg { OPERATOR_NOT_ALLOWED_WITH_MAPJOIN(10227, "Not all clauses are supported with mapjoin hint. Please remove mapjoin hint."), - ANALYZE_TABLE_NOSCAN_NON_NATIVE(10228, "ANALYZE TABLE NOSCAN cannot be used for " - + "a non-native table"), - - ANALYZE_TABLE_PARTIALSCAN_NON_NATIVE(10229, "ANALYZE TABLE PARTIALSCAN cannot be used for " - + "a non-native table"), - ANALYZE_TABLE_PARTIALSCAN_NON_RCFILE(10230, "ANALYZE TABLE PARTIALSCAN doesn't " - + "support non-RCfile. "), - ANALYZE_TABLE_PARTIALSCAN_EXTERNAL_TABLE(10231, "ANALYZE TABLE PARTIALSCAN " - + "doesn't support external table: "), - ANALYZE_TABLE_PARTIALSCAN_AGGKEY(10232, "Analyze partialscan command " - + "fails to construct aggregation for the partition "), - ANALYZE_TABLE_PARTIALSCAN_AUTOGATHER(10233, "Analyze partialscan is not allowed " + - "if hive.stats.autogather is set to false"), + ANALYZE_TABLE_NOSCAN_NON_NATIVE(10228, "ANALYZE TABLE NOSCAN cannot be used for " + "a non-native table"), + PARTITION_VALUE_NOT_CONTINUOUS(10234, "Partition values specified are not continuous." + " A subpartition value is specified without specifying the parent partition's value"), TABLES_INCOMPATIBLE_SCHEMAS(10235, "Tables have incompatible schemas and their partitions " + @@ -606,11 +595,21 @@ public enum ErrorMsg { * @return ErrorMsg */ public static ErrorMsg getErrorMsg(Exception e) { - if (e instanceof AccessControlException) return ACCESS_DENIED; - if (e instanceof NSQuotaExceededException) return QUOTA_EXCEEDED; - if (e instanceof DSQuotaExceededException) return QUOTA_EXCEEDED; - if (e instanceof UnresolvedPathException) return UNRESOLVED_PATH; - if (e instanceof FileNotFoundException) return FILE_NOT_FOUND; + if (e instanceof AccessControlException) { + return ACCESS_DENIED; + } + if (e instanceof NSQuotaExceededException) { + return QUOTA_EXCEEDED; + } + if (e instanceof DSQuotaExceededException) { + return QUOTA_EXCEEDED; + } + if (e instanceof UnresolvedPathException) { + return UNRESOLVED_PATH; + } + if (e instanceof FileNotFoundException) { + return FILE_NOT_FOUND; + } return UNRESOLVED_RT_EXCEPTION; } http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java index 154fec4..69a750b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java @@ -35,7 +35,6 @@ public class QueryProperties { boolean query; boolean analyzeCommand; - boolean partialScanAnalyzeCommand; boolean noScanAnalyzeCommand; boolean analyzeRewrite; boolean ctas; @@ -84,14 +83,6 @@ public class QueryProperties { this.analyzeCommand = analyzeCommand; } - public boolean isPartialScanAnalyzeCommand() { - return partialScanAnalyzeCommand; - } - - public void setPartialScanAnalyzeCommand(boolean partialScanAnalyzeCommand) { - this.partialScanAnalyzeCommand = partialScanAnalyzeCommand; - } - public boolean isNoScanAnalyzeCommand() { return noScanAnalyzeCommand; } @@ -130,8 +121,9 @@ public class QueryProperties { public void incrementJoinCount(boolean outerJoin) { noOfJoins++; - if (outerJoin) + if (outerJoin) { noOfOuterJoins++; + } } public int getJoinCount() { @@ -278,7 +270,6 @@ public class QueryProperties { public void clear() { query = false; analyzeCommand = false; - partialScanAnalyzeCommand = false; noScanAnalyzeCommand = false; analyzeRewrite = false; ctas = false; http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java index 9c3a664..61e19ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java @@ -64,7 +64,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * StatsNoJobTask is used in cases where stats collection is the only task for the given query (no - * parent MR or Tez job). It is used in the following cases 1) ANALYZE with partialscan/noscan for + * parent MR or Tez job). It is used in the following cases 1) ANALYZE with noscan for * file formats that implement StatsProvidingRecordReader interface: ORC format (implements * StatsProvidingRecordReader) stores column statistics for all columns in the file footer. Its much * faster to compute the table/partition statistics by reading the footer than scanning all the http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index e9c69d9..e1969bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -37,8 +37,6 @@ import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask; import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork; import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; -import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask; -import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; import org.apache.hadoop.hive.ql.plan.ConditionalWork; @@ -109,8 +107,6 @@ public final class TaskFactory { MergeFileTask.class)); taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class, DependencyCollectionTask.class)); - taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class, - PartialScanTask.class)); taskvec.add(new TaskTuple<IndexMetadataChangeWork>(IndexMetadataChangeWork.class, IndexMetadataChangeTask.class)); taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class)); http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8311037..c199780 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -82,8 +82,6 @@ import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; -import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper; -import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -447,8 +445,6 @@ public final class Utilities { gWork = SerializationUtilities.deserializePlan(kryo, in, MergeFileWork.class); } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { gWork = SerializationUtilities.deserializePlan(kryo, in, ColumnTruncateWork.class); - } else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = SerializationUtilities.deserializePlan(kryo, in, PartialScanWork.class); } else { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)) ; @@ -2179,7 +2175,9 @@ public final class Utilities { */ @VisibleForTesting static int getMaxExecutorsForInputListing(final Configuration conf, int inputLocationListSize) { - if (inputLocationListSize < 1) return 0; + if (inputLocationListSize < 1) { + return 0; + } int maxExecutors = 1; @@ -3114,8 +3112,9 @@ public final class Utilities { boolean hasLogged = false; // Note: this copies the list because createDummyFileForEmptyPartition may modify the map. for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { throw new IOException("Operation is Canceled."); + } List<String> aliases = work.getPathToAliases().get(file); if (aliases.contains(alias)) { @@ -3611,8 +3610,9 @@ public final class Utilities { public static boolean skipHeader(RecordReader<WritableComparable, Writable> currRecReader, int headerCount, WritableComparable key, Writable value) throws IOException { while (headerCount > 0) { - if (!currRecReader.next(key, value)) + if (!currRecReader.next(key, value)) { return false; + } headerCount--; } return true; http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java deleted file mode 100644 index 9a6406d..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io.rcfile.stats; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer; -import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper; -import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; -import org.apache.hadoop.hive.ql.stats.StatsFactory; -import org.apache.hadoop.hive.ql.stats.StatsPublisher; -import org.apache.hadoop.hive.shims.CombineHiveKey; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -/** - * - * PartialScanMapper. - * - * It reads through block header and aggregates statistics at the end. - * - * https://issues.apache.org/jira/browse/HIVE-3958 - */ -@SuppressWarnings("deprecation") -public class PartialScanMapper extends MapReduceBase implements - Mapper<Object, RCFileValueBufferWrapper, Object, Object> { - - private JobConf jc; - private String statsAggKeyPrefix; - private long uncompressedFileSize = 0; - private long rowNo = 0; - private boolean exception = false; - private Reporter rp = null; - - private static final Logger LOG = LoggerFactory.getLogger("PartialScanMapper"); - - public PartialScanMapper() { - } - - @Override - public void configure(JobConf job) { - jc = job; - MapredContext.init(true, new JobConf(jc)); - statsAggKeyPrefix = HiveConf.getVar(job, - HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX); - } - - - @Override - public void map(Object k, RCFileValueBufferWrapper value, - OutputCollector<Object, Object> output, Reporter reporter) - throws IOException { - - if (rp == null) { - this.rp = reporter; - MapredContext.get().setReporter(reporter); - } - - try { - //CombineHiveInputFormat may be set in PartialScanTask. - RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper) - ((k instanceof CombineHiveKey) ? ((CombineHiveKey) k).getKey() : k); - - // calculate rawdatasize - KeyBuffer keyBuffer = key.getKeyBuffer(); - long[] uncompressedColumnSizes = new long[keyBuffer.getColumnNumber()]; - for (int i = 0; i < keyBuffer.getColumnNumber(); i++) { - uncompressedColumnSizes[i] += keyBuffer.getEachColumnUncompressedValueLen()[i]; - } - if (uncompressedColumnSizes != null) { - for (int i = 0; i < uncompressedColumnSizes.length; i++) { - uncompressedFileSize += uncompressedColumnSizes[i]; - } - } - - // calculate no. of rows - rowNo += keyBuffer.getNumberRows(); - } catch (Throwable e) { - this.exception = true; - close(); - throw new IOException(e); - } - - } - - - @Override - public void close() throws IOException { - try { - // Only publish stats if this operator's flag was set to gather stats - if (!exception) { - publishStats(); - } - } catch (HiveException e) { - this.exception = true; - throw new RuntimeException(e); - } finally { - MapredContext.close(); - } - } - - - /** - * Publish statistics. - * similar to FileSinkOperator.java publishStats() - * - * @throws HiveException - */ - private void publishStats() throws HiveException { - // Initializing a stats publisher - StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc); - - if (statsPublisher == null) { - // just return, stats gathering should not block the main query - LOG.error("StatsPublishing error: StatsPublisher is not initialized."); - throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg()); - } - - StatsCollectionContext sc = new StatsCollectionContext(jc); - sc.setStatsTmpDir(jc.get(StatsSetupConst.STATS_TMP_LOC, "")); - if (!statsPublisher.connect(sc)) { - // should fail since stats gathering is main purpose of the job - LOG.error("StatsPublishing error: cannot connect to database"); - throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg()); - } - - // construct key used to store stats in intermediate db - String key = statsAggKeyPrefix.endsWith(Path.SEPARATOR) ? statsAggKeyPrefix : statsAggKeyPrefix - + Path.SEPARATOR; - - // construct statistics to be stored - Map<String, String> statsToPublish = new HashMap<String, String>(); - statsToPublish.put(StatsSetupConst.RAW_DATA_SIZE, Long.toString(uncompressedFileSize)); - statsToPublish.put(StatsSetupConst.ROW_COUNT, Long.toString(rowNo)); - - if (!statsPublisher.publishStat(key, statsToPublish)) { - // The original exception is lost. - // Not changing the interface to maintain backward compatibility - throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg()); - } - - if (!statsPublisher.closeConnection(sc)) { - // The original exception is lost. - // Not changing the interface to maintain backward compatibility - throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg()); - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java deleted file mode 100644 index ad921f3..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ /dev/null @@ -1,381 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io.rcfile.stats; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper; -import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHook; -import org.apache.hadoop.hive.ql.exec.mr.Throttle; -import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; -import org.apache.hadoop.hive.ql.stats.StatsFactory; -import org.apache.hadoop.hive.ql.stats.StatsPublisher; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.Logger; -import org.apache.logging.log4j.core.appender.FileAppender; -import org.apache.logging.log4j.core.appender.RollingFileAppender; - -/** - * PartialScanTask. - * This task handles RCFile statics calculation via partial scan. - * Instead of reading all bytes, it reads block header and aggregates result. - * https://issues.apache.org/jira/browse/HIVE-3958 - */ -@SuppressWarnings( { "deprecation"}) -public class PartialScanTask extends Task<PartialScanWork> implements - Serializable, HadoopJobExecHook { - private static final long serialVersionUID = 1L; - - protected transient JobConf job; - protected HadoopJobExecHelper jobExecHelper; - - @Override - public void initialize(QueryState queryState, QueryPlan queryPlan, - DriverContext driverContext, CompilationOpContext opContext) { - super.initialize(queryState, queryPlan, driverContext, opContext); - job = new JobConf(conf, PartialScanTask.class); - jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); - } - - @Override - public boolean requireLock() { - return true; - } - - boolean success = true; - - @Override - /** - * start a new map-reduce job to do partial scan to calculate Stats, - * almost the same as BlockMergeTask or ExecDriver. - */ - public int execute(DriverContext driverContext) { - HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, - CombineHiveInputFormat.class.getName()); - success = true; - HiveFileFormatUtils.prepareJobOutput(job); - job.setOutputFormat(HiveOutputFormatImpl.class); - job.setMapperClass(work.getMapperClass()); - - Context ctx = driverContext.getCtx(); - boolean ctxCreated = false; - try { - if (ctx == null) { - ctx = new Context(job); - ctxCreated = true; - } - }catch (IOException e) { - e.printStackTrace(); - console.printError("Error launching map-reduce job", "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return 5; - } - - job.setMapOutputKeyClass(NullWritable.class); - job.setMapOutputValueClass(NullWritable.class); - if(work.getNumMapTasks() != null) { - job.setNumMapTasks(work.getNumMapTasks()); - } - - // zero reducers - job.setNumReduceTasks(0); - - if (work.getMinSplitSize() != null) { - HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work - .getMinSplitSize().longValue()); - } - - if (work.getInputformat() != null) { - HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, work - .getInputformat()); - } - - String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT); - - LOG.info("Using " + inpFormat); - - try { - job.setInputFormat(JavaUtils.loadClass(inpFormat)); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e.getMessage(), e); - } - - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(NullWritable.class); - - int returnVal = 0; - RunningJob rj = null; - boolean noName = StringUtils.isEmpty(job.get(MRJobConfig.JOB_NAME)); - - String jobName = null; - if (noName && this.getQueryPlan() != null) { - int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); - jobName = Utilities.abbreviate(this.getQueryPlan().getQueryStr(), - maxlen - 6); - } - - if (noName) { - // This is for a special case to ensure unit tests pass - job.set(MRJobConfig.JOB_NAME, - jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt()); - } - - // pass aggregation key to mapper - HiveConf.setVar(job, - HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX, - work.getAggKey()); - job.set(StatsSetupConst.STATS_TMP_LOC, work.getStatsTmpDir()); - try { - addInputPaths(job, work); - - MapredWork mrWork = new MapredWork(); - mrWork.setMapWork(work); - Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpPath()); - - // remove the pwd from conf file so that job tracker doesn't show this - // logs - String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD); - if (pwd != null) { - HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE"); - } - JobClient jc = new JobClient(job); - - String addedJars = Utilities.getResourceFiles(job, SessionState.ResourceType.JAR); - if (!addedJars.isEmpty()) { - job.set("tmpjars", addedJars); - } - - // make this client wait if job trcker is not behaving well. - Throttle.checkJobTracker(job, LOG); - - if (work.isGatheringStats()) { - // initialize stats publishing table - StatsPublisher statsPublisher; - StatsFactory factory = StatsFactory.newFactory(job); - if (factory != null) { - statsPublisher = factory.getStatsPublisher(); - StatsCollectionContext sc = new StatsCollectionContext(job); - sc.setStatsTmpDir(work.getStatsTmpDir()); - if (!statsPublisher.init(sc)) { // creating stats table if not exists - if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { - throw - new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); - } - } - } - } - - // Finally SUBMIT the JOB! - rj = jc.submitJob(job); - this.jobID = rj.getJobID(); - returnVal = jobExecHelper.progress(rj, jc, ctx); - success = (returnVal == 0); - - } catch (Exception e) { - e.printStackTrace(); - setException(e); - String mesg = " with exception '" + Utilities.getNameMessage(e) + "'"; - if (rj != null) { - mesg = "Ended Job = " + rj.getJobID() + mesg; - } else { - mesg = "Job Submission failed" + mesg; - } - - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - console.printError(mesg, "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - - success = false; - returnVal = 1; - } finally { - try { - if (ctxCreated) { - ctx.clear(); - } - if (rj != null) { - if (returnVal != 0) { - rj.killJob(); - } - } - } catch (Exception e) { - LOG.warn("Failed in cleaning up ", e); - } finally { - HadoopJobExecHelper.runningJobs.remove(rj); - } - } - - return (returnVal); - } - - private void addInputPaths(JobConf job, PartialScanWork work) { - for (Path path : work.getInputPaths()) { - FileInputFormat.addInputPath(job, path); - } - } - - @Override - public String getName() { - return "RCFile Statistics Partial Scan"; - } - - public static final String INPUT_SEPERATOR = ":"; - - public static void main(String[] args) { - String inputPathStr = null; - String outputDir = null; - String jobConfFileName = null; - - try { - for (int i = 0; i < args.length; i++) { - if (args[i].equals("-input")) { - inputPathStr = args[++i]; - } else if (args[i].equals("-jobconffile")) { - jobConfFileName = args[++i]; - } else if (args[i].equals("-outputDir")) { - outputDir = args[++i]; - } - } - } catch (IndexOutOfBoundsException e) { - System.err.println("Missing argument to option"); - printUsage(); - } - - if (inputPathStr == null || outputDir == null - || outputDir.trim().equals("")) { - printUsage(); - } - - List<Path> inputPaths = new ArrayList<Path>(); - String[] paths = inputPathStr.split(INPUT_SEPERATOR); - if (paths == null || paths.length == 0) { - printUsage(); - } - - FileSystem fs = null; - JobConf conf = new JobConf(PartialScanTask.class); - for (String path : paths) { - try { - Path pathObj = new Path(path); - if (fs == null) { - fs = FileSystem.get(pathObj.toUri(), conf); - } - FileStatus fstatus = fs.getFileStatus(pathObj); - if (fstatus.isDir()) { - FileStatus[] fileStatus = fs.listStatus(pathObj); - for (FileStatus st : fileStatus) { - inputPaths.add(st.getPath()); - } - } else { - inputPaths.add(fstatus.getPath()); - } - } catch (IOException e) { - e.printStackTrace(System.err); - } - } - - if (jobConfFileName != null) { - conf.addResource(new Path(jobConfFileName)); - } - - org.slf4j.Logger LOG = LoggerFactory.getLogger(PartialScanTask.class.getName()); - boolean isSilent = HiveConf.getBoolVar(conf, - HiveConf.ConfVars.HIVESESSIONSILENT); - LogHelper console = new LogHelper(LOG, isSilent); - - // print out the location of the log file for the user so - // that it's easy to find reason for local mode execution failures - for (Appender appender : ((Logger) LogManager.getRootLogger()).getAppenders().values()) { - if (appender instanceof FileAppender) { - console.printInfo("Execution log at: " + ((FileAppender) appender).getFileName()); - } else if (appender instanceof RollingFileAppender) { - console.printInfo("Execution log at: " + ((RollingFileAppender) appender).getFileName()); - } - } - - QueryState queryState = - new QueryState.Builder().withHiveConf(new HiveConf(conf, PartialScanTask.class)).build(); - PartialScanWork mergeWork = new PartialScanWork(inputPaths); - DriverContext driverCxt = new DriverContext(); - PartialScanTask taskExec = new PartialScanTask(); - taskExec.initialize(queryState, null, driverCxt, new CompilationOpContext()); - taskExec.setWork(mergeWork); - int ret = taskExec.execute(driverCxt); - - if (ret != 0) { - System.exit(2); - } - - } - - private static void printUsage() { - System.exit(1); - } - - @Override - public StageType getType() { - return StageType.MAPRED; - } - - - @Override - public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { - return false; - } - - @Override - public void logPlanProgress(SessionState ss) throws IOException { - // no op - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java deleted file mode 100644 index 919cea0..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io.rcfile.stats; - -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS; - -import java.io.Serializable; -import java.util.LinkedHashMap; -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst.StatDB; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat; -import org.apache.hadoop.hive.ql.plan.Explain; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.Explain.Level; -import org.apache.hadoop.mapred.Mapper; - -/** - * Partial Scan Work. - * - */ -@Explain(displayName = "Partial Scan Statistics") -public class PartialScanWork extends MapWork implements Serializable { - - private static final long serialVersionUID = 1L; - - private transient List<Path> inputPaths; - private String aggKey; - private String statsTmpDir; - - public PartialScanWork() { - } - - public PartialScanWork(List<Path> inputPaths) { - super(); - this.inputPaths = inputPaths; - PartitionDesc partDesc = new PartitionDesc(); - partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class); - for(Path path: this.inputPaths) { - this.addPathToPartitionInfo(path, partDesc); - } - } - - public List<Path> getInputPaths() { - return inputPaths; - } - - public void setInputPaths(List<Path> inputPaths) { - this.inputPaths = inputPaths; - } - - public Class<? extends Mapper> getMapperClass() { - return PartialScanMapper.class; - } - - @Override - public Long getMinSplitSize() { - return null; - } - - @Override - public String getInputformat() { - return CombineHiveInputFormat.class.getName(); - } - - @Override - public boolean isGatheringStats() { - return true; - } - - /** - * @return the aggKey - */ - @Explain(displayName = "Stats Aggregation Key Prefix", explainLevels = { Level.EXTENDED }) - public String getAggKey() { - return aggKey; - } - - /** - * @param aggKey the aggKey to set - */ - public void setAggKey(String aggKey) { - this.aggKey = aggKey; - } - - public String getStatsTmpDir() { - return statsTmpDir; - } - - public void setStatsTmpDir(String statsTmpDir, HiveConf conf) { - this.statsTmpDir = HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name()) - ? statsTmpDir : ""; - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java index 9297a0b..2adb144 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java @@ -18,16 +18,12 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; @@ -35,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; -import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -89,13 +84,11 @@ public class GenMRTableScan1 implements NodeProcessor { mapCurrCtx.put(op, new GenMapRedCtx(currTask, currAliasId)); if (parseCtx.getQueryProperties().isAnalyzeCommand()) { - boolean partialScan = parseCtx.getQueryProperties().isPartialScanAnalyzeCommand(); boolean noScan = parseCtx.getQueryProperties().isNoScanAnalyzeCommand(); if (OrcInputFormat.class.isAssignableFrom(inputFormat) || MapredParquetInputFormat.class.isAssignableFrom(inputFormat)) { // For ORC and Parquet, all the following statements are the same // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS - // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan; // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan; // There will not be any MR or Tez job above this task @@ -142,11 +135,6 @@ public class GenMRTableScan1 implements NodeProcessor { ctx.getRootTasks().add(statsTask); } - // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan; - if (partialScan) { - handlePartialScanCommand(op, ctx, parseCtx, currTask, statsWork, statsTask); - } - currWork.getMapWork().setGatheringStats(true); if (currWork.getReduceWork() != null) { currWork.getReduceWork().setGatheringStats(true); @@ -174,48 +162,4 @@ public class GenMRTableScan1 implements NodeProcessor { assert false; return null; } - - /** - * handle partial scan command. It is composed of PartialScanTask followed by StatsTask . - * @param op - * @param ctx - * @param parseCtx - * @param currTask - * @param parseInfo - * @param statsWork - * @param statsTask - * @throws SemanticException - */ - private void handlePartialScanCommand(TableScanOperator op, GenMRProcContext ctx, - ParseContext parseCtx, Task<? extends Serializable> currTask, - StatsWork statsWork, Task<StatsWork> statsTask) throws SemanticException { - String aggregationKey = op.getConf().getStatsAggPrefix(); - StringBuilder aggregationKeyBuffer = new StringBuilder(aggregationKey); - List<Path> inputPaths = GenMapRedUtils.getInputPathsForPartialScan(op, aggregationKeyBuffer); - aggregationKey = aggregationKeyBuffer.toString(); - - // scan work - PartialScanWork scanWork = new PartialScanWork(inputPaths); - scanWork.setMapperCannotSpanPartns(true); - scanWork.setAggKey(aggregationKey); - scanWork.setStatsTmpDir(op.getConf().getTmpStatsDir(), parseCtx.getConf()); - - // stats work - statsWork.setPartialScanAnalyzeCommand(true); - - // partial scan task - DriverContext driverCxt = new DriverContext(); - Task<PartialScanWork> psTask = TaskFactory.get(scanWork, parseCtx.getConf()); - psTask.initialize(parseCtx.getQueryState(), null, driverCxt, op.getCompilationOpContext()); - psTask.setWork(scanWork); - - // task dependency - ctx.getRootTasks().remove(currTask); - ctx.getRootTasks().add(psTask); - psTask.addDependentTask(statsTask); - List<Task<? extends Serializable>> parentTasks = new ArrayList<Task<? extends Serializable>>(); - parentTasks.add(psTask); - statsTask.setParentTasks(parentTasks); - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 25cca7b..b861d8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -41,11 +40,8 @@ import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.DemuxOperator; @@ -1976,33 +1972,6 @@ public final class GenMapRedUtils { return Collections.emptyList(); } - public static List<Path> getInputPathsForPartialScan(TableScanOperator tableScanOp, - Appendable aggregationKey) throws SemanticException { - List<Path> inputPaths = new ArrayList<Path>(); - switch (tableScanOp.getConf().getTableMetadata().getTableSpec().specType) { - case TABLE_ONLY: - inputPaths.add(tableScanOp.getConf().getTableMetadata() - .getTableSpec().tableHandle.getPath()); - break; - case STATIC_PARTITION: - Partition part = tableScanOp.getConf().getTableMetadata() - .getTableSpec().partHandle; - try { - aggregationKey.append(Warehouse.makePartPath(part.getSpec())); - } catch (MetaException e) { - throw new SemanticException(ErrorMsg.ANALYZE_TABLE_PARTIALSCAN_AGGKEY.getMsg( - part.getDataLocation().toString() + e.getMessage())); - } catch (IOException e) { - throw new RuntimeException(e); - } - inputPaths.add(part.getDataLocation()); - break; - default: - assert false; - } - return inputPaths; - } - public static Set<String> findAliases(final MapWork work, Operator<?> startOp) { Set<String> aliases = new LinkedHashSet<String>(); for (Operator<?> topOp : findTopOps(startOp, null)) { http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java index b5bc386..7f5136f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java @@ -91,7 +91,6 @@ ekoifman:apache ekoifman$ find . -name *Work.java ./hadoop/hive/ql/exec/repl/ReplStateLogWork.java ./hadoop/hive/ql/index/IndexMetadataChangeWork.java ./hadoop/hive/ql/io/merge/MergeFileWork.java - extends MapWork -./hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java - extends MapWork ./hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java - extends MapWork ./hadoop/hive/ql/parse/GenTezWork.java ./hadoop/hive/ql/parse/spark/GenSparkWork.java http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java index 1923a9b..5733909 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java @@ -357,8 +357,6 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer { // check if it is no scan. grammar prevents coexit noscan/columns super.processNoScanCommand(ast); - // check if it is partial scan. grammar prevents coexit partialscan/columns - super.processPartialScanCommand(ast); /* Rewrite only analyze table <> column <> compute statistics; Don't rewrite analyze table * command - table stats are collected by the table scan operator and is not rewritten to * an aggregation. http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 6ffca02..d0ce4ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -288,7 +288,6 @@ KW_GROUPING: 'GROUPING'; KW_SETS: 'SETS'; KW_TRUNCATE: 'TRUNCATE'; KW_NOSCAN: 'NOSCAN'; -KW_PARTIALSCAN: 'PARTIALSCAN'; KW_USER: 'USER'; KW_ROLE: 'ROLE'; KW_ROLES: 'ROLES'; http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 2cb6946..d238833 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -1539,9 +1539,9 @@ analyzeStatement @after { popMsg(state); } : KW_ANALYZE KW_TABLE (parttype=tableOrPartition) ( - (KW_COMPUTE) => KW_COMPUTE KW_STATISTICS ((noscan=KW_NOSCAN) | (partialscan=KW_PARTIALSCAN) + (KW_COMPUTE) => KW_COMPUTE KW_STATISTICS ((noscan=KW_NOSCAN) | (KW_FOR KW_COLUMNS (statsColumnName=columnNameList)?))? - -> ^(TOK_ANALYZE $parttype $noscan? $partialscan? KW_COLUMNS? $statsColumnName?) + -> ^(TOK_ANALYZE $parttype $noscan? KW_COLUMNS? $statsColumnName?) | (KW_CACHE) => KW_CACHE KW_METADATA -> ^(TOK_CACHE_METADATA $parttype) ) http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java index b6d7ee8..c497419 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -27,15 +26,12 @@ import java.util.Stack; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -99,7 +95,6 @@ public class ProcessAnalyzeTable implements NodeProcessor { MapredParquetInputFormat.class.isAssignableFrom(inputFormat)) { // For ORC & Parquet, all the following statements are the same // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS - // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan; // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan; // There will not be any Tez job above this task @@ -145,11 +140,6 @@ public class ProcessAnalyzeTable implements NodeProcessor { context.rootTasks.add(statsTask); } - // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan; - if (parseContext.getQueryProperties().isPartialScanAnalyzeCommand()) { - handlePartialScanCommand(tableScan, parseContext, statsWork, context, statsTask); - } - // NOTE: here we should use the new partition predicate pushdown API to // get a list of pruned list, // and pass it to setTaskPlan as the last parameter @@ -217,42 +207,4 @@ public class ProcessAnalyzeTable implements NodeProcessor { return TaskFactory.get(statsWork, parseContext.getConf()); } } - - /** - * handle partial scan command. - * - * It is composed of PartialScanTask followed by StatsTask. - */ - private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext parseContext, - StatsWork statsWork, GenTezProcContext context, Task<StatsWork> statsTask) - throws SemanticException { - - String aggregationKey = tableScan.getConf().getStatsAggPrefix(); - StringBuilder aggregationKeyBuffer = new StringBuilder(aggregationKey); - List<Path> inputPaths = GenMapRedUtils.getInputPathsForPartialScan(tableScan, - aggregationKeyBuffer); - aggregationKey = aggregationKeyBuffer.toString(); - - // scan work - PartialScanWork scanWork = new PartialScanWork(inputPaths); - scanWork.setMapperCannotSpanPartns(true); - scanWork.setAggKey(aggregationKey); - scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir(), parseContext.getConf()); - - // stats work - statsWork.setPartialScanAnalyzeCommand(true); - - // partial scan task - DriverContext driverCxt = new DriverContext(); - Task<PartialScanWork> partialScanTask = TaskFactory.get(scanWork, parseContext.getConf()); - partialScanTask.initialize(parseContext.getQueryState(), null, driverCxt, - tableScan.getCompilationOpContext()); - partialScanTask.setWork(scanWork); - statsWork.setSourceTask(partialScanTask); - - // task dependency - context.rootTasks.remove(context.currentTask); - context.rootTasks.add(partialScanTask); - partialScanTask.addDependentTask(statsTask); - } } http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java index 38df5de..ab71073 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java @@ -71,8 +71,6 @@ public class QBParseInfo { private boolean isAnalyzeCommand; // used for the analyze command (statistics) private boolean isNoScanAnalyzeCommand; // used for the analyze command (statistics) (noscan) - private boolean isPartialScanAnalyzeCommand; // used for the analyze command (statistics) - // (partialscan) private final HashMap<String, TableSpec> tableSpecs; // used for statistics @@ -657,20 +655,6 @@ public class QBParseInfo { } /** - * @return the isPartialScanAnalyzeCommand - */ - public boolean isPartialScanAnalyzeCommand() { - return isPartialScanAnalyzeCommand; - } - - /** - * @param isPartialScanAnalyzeCommand the isPartialScanAnalyzeCommand to set - */ - public void setPartialScanAnalyzeCommand(boolean isPartialScanAnalyzeCommand) { - this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand; - } - - /** * See also {@link #isInsertIntoTable(String)} */ public Map<String, ASTNode> getInsertOverwriteTables() { http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c8277f4..a5b69ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -116,8 +116,6 @@ import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.NullRowsInputFormat; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; @@ -222,7 +220,6 @@ import org.apache.hadoop.hive.serde2.NullStructSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe2; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -321,9 +318,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // flag for no scan during analyze ... compute statistics protected boolean noscan; - //flag for partial scan during analyze ... compute statistics - protected boolean partialscan; - protected volatile boolean disableJoinMerge = false; protected final boolean defaultJoinMerge; @@ -400,7 +394,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { globalLimitCtx = new GlobalLimitCtx(); viewAliasToInput = new HashMap<String, ReadEntity>(); mergeIsDirect = true; - noscan = partialscan = false; + noscan = false; tabNameToTabObject = new HashMap<>(); defaultJoinMerge = false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MERGE_NWAY_JOINS); disableJoinMerge = defaultJoinMerge; @@ -956,8 +950,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (firstRow) { fields.add(new FieldSchema("tmp_values_col" + nextColNum++, "string", "")); } - if (isFirst) isFirst = false; - else writeAsText("\u0001", out); + if (isFirst) { + isFirst = false; + } else { + writeAsText("\u0001", out); + } writeAsText(unparseExprForValuesClause(value), out); } writeAsText("\n", out); @@ -1471,8 +1468,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } - if ((ast.getChild(posn).getChild(0).getType() == HiveParser.TOK_TRANSFORM)) + if ((ast.getChild(posn).getChild(0).getType() == HiveParser.TOK_TRANSFORM)) { queryProperties.setUsesScript(true); + } LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast, qb, ctx_1.dest); @@ -1484,8 +1482,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { case HiveParser.TOK_WHERE: qbp.setWhrExprForClause(ctx_1.dest, ast); - if (!SubQueryUtils.findSubQueries((ASTNode) ast.getChild(0)).isEmpty()) - queryProperties.setFilterWithSubQuery(true); + if (!SubQueryUtils.findSubQueries((ASTNode) ast.getChild(0)).isEmpty()) { + queryProperties.setFilterWithSubQuery(true); + } break; case HiveParser.TOK_INSERT_INTO: @@ -1694,7 +1693,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { qb.addAlias(table_name); qb.getParseInfo().setIsAnalyzeCommand(true); qb.getParseInfo().setNoScanAnalyzeCommand(this.noscan); - qb.getParseInfo().setPartialScanAnalyzeCommand(this.partialscan); // Allow analyze the whole table and dynamic partitions HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); HiveConf.setVar(conf, HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); @@ -2082,27 +2080,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { "Cannot get partitions for " + ts.partSpec), e); } } - // validate partial scan command - QBParseInfo qbpi = qb.getParseInfo(); - if (qbpi.isPartialScanAnalyzeCommand()) { - Class<? extends InputFormat> inputFormatClass = null; - switch (ts.specType) { - case TABLE_ONLY: - case DYNAMIC_PARTITION: - inputFormatClass = ts.tableHandle.getInputFormatClass(); - break; - case STATIC_PARTITION: - inputFormatClass = ts.partHandle.getInputFormatClass(); - break; - default: - assert false; - } - // throw a HiveException for formats other than rcfile or orcfile. - if (!(inputFormatClass.equals(RCFileInputFormat.class) || inputFormatClass - .equals(OrcInputFormat.class))) { - throw new SemanticException(ErrorMsg.ANALYZE_TABLE_PARTIALSCAN_NON_RCFILE.getMsg()); - } - } tab.setTableSpec(ts); qb.getParseInfo().addTableSpec(alias, ts); @@ -8304,7 +8281,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (reduceKeys.size() == 0) { numReds = 1; String error = StrictChecks.checkCartesian(conf); - if (error != null) throw new SemanticException(error); + if (error != null) { + throw new SemanticException(error); + } } ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, @@ -9081,12 +9060,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * 2. TableName, ColumnName, Target-TableName * */ private Map<String, List<SemiJoinHint>> parseSemiJoinHint(List<ASTNode> hints) throws SemanticException { - if (hints == null || hints.size() == 0) return null; + if (hints == null || hints.size() == 0) { + return null; + } Map<String, List<SemiJoinHint>> result = null; for (ASTNode hintNode : hints) { for (Node node : hintNode.getChildren()) { ASTNode hint = (ASTNode) node; - if (hint.getChild(0).getType() != HintParser.TOK_LEFTSEMIJOIN) continue; + if (hint.getChild(0).getType() != HintParser.TOK_LEFTSEMIJOIN) { + continue; + } if (result == null) { result = new HashMap<>(); } @@ -9161,11 +9144,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * @throws SemanticException */ private boolean disableMapJoinWithHint(List<ASTNode> hints) throws SemanticException { - if (hints == null || hints.size() == 0) return false; + if (hints == null || hints.size() == 0) { + return false; + } for (ASTNode hintNode : hints) { for (Node node : hintNode.getChildren()) { ASTNode hint = (ASTNode) node; - if (hint.getChild(0).getType() != HintParser.TOK_MAPJOIN) continue; + if (hint.getChild(0).getType() != HintParser.TOK_MAPJOIN) { + continue; + } Tree args = hint.getChild(1); if (args.getChildCount() == 1) { String text = args.getChild(0).getText(); @@ -11299,11 +11286,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { continue; } ASTNode n = (ASTNode) ((ASTNode) child).getFirstChildWithType(HiveParser.TOK_INSERT_INTO); - if (n == null) continue; + if (n == null) { + continue; + } n = (ASTNode) n.getFirstChildWithType(HiveParser.TOK_TAB); - if (n == null) continue; + if (n == null) { + continue; + } n = (ASTNode) n.getFirstChildWithType(HiveParser.TOK_TABNAME); - if (n == null) continue; + if (n == null) { + continue; + } String[] dbTab = getQualifiedTableName(n); Table t = db.getTable(dbTab[0], dbTab[1]); Path tablePath = t.getPath(); @@ -12735,24 +12728,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } /** - * process analyze ... partial command - * - * separate it from noscan command process so that it provides us flexibility - * - * @param tree - * @throws SemanticException - */ - protected void processPartialScanCommand (ASTNode tree) throws SemanticException { - // check if it is partial scan command - this.checkPartialScan(tree); - - //validate partial scan - if (this.partialscan) { - validateAnalyzePartialscan(tree); - } - } - - /** * process analyze ... noscan command * @param tree * @throws SemanticException @@ -12797,45 +12772,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } /** - * Validate partialscan command - * - * @param tree - * @throws SemanticException - */ - private void validateAnalyzePartialscan(ASTNode tree) throws SemanticException { - // since it is partialscan, it is true table name in command - String tableName = getUnescapedName((ASTNode) tree.getChild(0).getChild(0)); - Table tbl; - try { - tbl = this.getTableObjectByName(tableName); - } catch (InvalidTableException e) { - throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName), e); - } catch (HiveException e) { - throw new SemanticException(e.getMessage(), e); - } - /* partialscan uses hdfs apis to retrieve such information from Namenode. */ - /* But that will be specific to hdfs. Through storagehandler mechanism, */ - /* storage of table could be on any storage system: hbase, cassandra etc. */ - /* A nice error message should be given to user. */ - if (tbl.isNonNative()) { - throw new SemanticException(ErrorMsg.ANALYZE_TABLE_PARTIALSCAN_NON_NATIVE.getMsg(tbl - .getTableName())); - } - - /** - * Partial scan doesn't support external table. - */ - if(tbl.getTableType().equals(TableType.EXTERNAL_TABLE)) { - throw new SemanticException(ErrorMsg.ANALYZE_TABLE_PARTIALSCAN_EXTERNAL_TABLE.getMsg(tbl - .getTableName())); - } - - if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { - throw new SemanticException(ErrorMsg.ANALYZE_TABLE_PARTIALSCAN_AUTOGATHER.getMsg()); - } - } - - /** * It will check if this is analyze ... compute statistics noscan * @param tree */ @@ -12855,27 +12791,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } - /** - * It will check if this is analyze ... compute statistics partialscan - * @param tree - */ - private void checkPartialScan(ASTNode tree) { - if (tree.getChildCount() > 1) { - ASTNode child0 = (ASTNode) tree.getChild(0); - ASTNode child1; - if (child0.getToken().getType() == HiveParser.TOK_TAB) { - child0 = (ASTNode) child0.getChild(0); - if (child0.getToken().getType() == HiveParser.TOK_TABNAME) { - child1 = (ASTNode) tree.getChild(1); - if (child1.getToken().getType() == HiveParser.KW_PARTIALSCAN) { - this.partialscan = true; - } - } - } - } - } - - public QB getQB() { return qb; } @@ -13743,7 +13658,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (qb != null) { queryProperties.setQuery(qb.getIsQuery()); queryProperties.setAnalyzeCommand(qb.getParseInfo().isAnalyzeCommand()); - queryProperties.setPartialScanAnalyzeCommand(qb.getParseInfo().isPartialScanAnalyzeCommand()); queryProperties.setNoScanAnalyzeCommand(qb.getParseInfo().isNoScanAnalyzeCommand()); queryProperties.setAnalyzeRewrite(qb.isAnalyzeRewrite()); queryProperties.setCTAS(qb.getTableDesc() != null); http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java index a2876e1..7ff321f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java @@ -25,14 +25,11 @@ import java.util.Stack; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -52,7 +49,7 @@ import com.google.common.base.Preconditions; /** * ProcessAnalyzeTable sets up work for the several variants of analyze table - * (normal, no scan, partial scan.) The plan at this point will be a single + * (normal, no scan) The plan at this point will be a single * table scan operator. * * Cloned from Tez ProcessAnalyzeTable. @@ -103,7 +100,6 @@ public class SparkProcessAnalyzeTable implements NodeProcessor { MapredParquetInputFormat.class.isAssignableFrom(inputFormat)) { // For ORC & Parquet, all the following statements are the same // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS - // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan; // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan; // There will not be any Spark job above this task StatsNoJobWork snjWork = new StatsNoJobWork(tableScan.getConf().getTableMetadata().getTableSpec()); @@ -146,11 +142,6 @@ public class SparkProcessAnalyzeTable implements NodeProcessor { context.rootTasks.add(statsTask); } - // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan; - if (parseContext.getQueryProperties().isPartialScanAnalyzeCommand()) { - handlePartialScanCommand(tableScan, parseContext, statsWork, context, statsTask); - } - // NOTE: here we should use the new partition predicate pushdown API to get a list of pruned list, // and pass it to setTaskPlan as the last parameter Set<Partition> confirmedPartns = GenMapRedUtils.getConfirmedPartitionsForScan(tableScan); @@ -170,42 +161,4 @@ public class SparkProcessAnalyzeTable implements NodeProcessor { return null; } - /** - * handle partial scan command. - * - * It is composed of PartialScanTask followed by StatsTask. - */ - private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext parseContext, - StatsWork statsWork, GenSparkProcContext context, Task<StatsWork> statsTask) - throws SemanticException { - String aggregationKey = tableScan.getConf().getStatsAggPrefix(); - StringBuilder aggregationKeyBuffer = new StringBuilder(aggregationKey); - List<Path> inputPaths = GenMapRedUtils.getInputPathsForPartialScan(tableScan, aggregationKeyBuffer); - aggregationKey = aggregationKeyBuffer.toString(); - - // scan work - PartialScanWork scanWork = new PartialScanWork(inputPaths); - scanWork.setMapperCannotSpanPartns(true); - scanWork.setAggKey(aggregationKey); - scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir(), parseContext.getConf()); - - // stats work - statsWork.setPartialScanAnalyzeCommand(true); - - // partial scan task - DriverContext driverCxt = new DriverContext(); - - @SuppressWarnings("unchecked") - Task<PartialScanWork> partialScanTask = TaskFactory.get(scanWork, parseContext.getConf()); - partialScanTask.initialize(parseContext.getQueryState(), null, driverCxt, - tableScan.getCompilationOpContext()); - partialScanTask.setWork(scanWork); - statsWork.setSourceTask(partialScanTask); - - // task dependency - context.rootTasks.remove(context.currentTask); - context.rootTasks.add(partialScanTask); - partialScanTask.addDependentTask(statsTask); - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java index a5050c5..74629d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java @@ -54,8 +54,6 @@ public class StatsWork implements Serializable { private boolean isNoScanAnalyzeCommand = false; - private boolean isPartialScanAnalyzeCommand = false; - // sourceTask for TS is not changed (currently) but that of FS might be changed // by various optimizers (auto.convert.join, for example) // so this is set by DriverContext in runtime @@ -146,20 +144,6 @@ public class StatsWork implements Serializable { this.isNoScanAnalyzeCommand = isNoScanAnalyzeCommand; } - /** - * @return the isPartialScanAnalyzeCommand - */ - public boolean isPartialScanAnalyzeCommand() { - return isPartialScanAnalyzeCommand; - } - - /** - * @param isPartialScanAnalyzeCommand the isPartialScanAnalyzeCommand to set - */ - public void setPartialScanAnalyzeCommand(boolean isPartialScanAnalyzeCommand) { - this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand; - } - public Task getSourceTask() { return sourceTask; } http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/test/queries/clientnegative/stats_partialscan_autogether.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/stats_partialscan_autogether.q b/ql/src/test/queries/clientnegative/stats_partialscan_autogether.q deleted file mode 100644 index fb3bd20..0000000 --- a/ql/src/test/queries/clientnegative/stats_partialscan_autogether.q +++ /dev/null @@ -1,32 +0,0 @@ -set hive.mapred.mode=nonstrict; -set datanucleus.cache.collections=false; -set hive.stats.autogather=false; -set hive.exec.dynamic.partition=true; -set hive.exec.dynamic.partition.mode=nonstrict; -set mapred.min.split.size=256; -set mapred.min.split.size.per.node=256; -set mapred.min.split.size.per.rack=256; -set mapred.max.split.size=256; - --- test analyze table ... compute statistics partialscan - --- 1. prepare data -CREATE table analyze_srcpart_partial_scan (key STRING, value STRING) -partitioned by (ds string, hr string) -stored as rcfile; -insert overwrite table analyze_srcpart_partial_scan partition (ds, hr) select * from srcpart where ds is not null order by key; -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11); - - --- 2. partialscan -explain -analyze table analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11) compute statistics partialscan; -analyze table analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11) compute statistics partialscan; - --- 3. confirm result -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11); -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-09',hr=11); -drop table analyze_srcpart_partial_scan; - - - http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/test/queries/clientnegative/stats_partialscan_non_external.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/stats_partialscan_non_external.q b/ql/src/test/queries/clientnegative/stats_partialscan_non_external.q deleted file mode 100644 index c206b8b..0000000 --- a/ql/src/test/queries/clientnegative/stats_partialscan_non_external.q +++ /dev/null @@ -1,5 +0,0 @@ - -CREATE EXTERNAL TABLE external_table (key int, value string); - --- we do not support analyze table ... partialscan on EXTERNAL tables yet -analyze table external_table compute statistics partialscan; http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/test/queries/clientnegative/stats_partialscan_non_native.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/stats_partialscan_non_native.q b/ql/src/test/queries/clientnegative/stats_partialscan_non_native.q deleted file mode 100644 index 8e02ced..0000000 --- a/ql/src/test/queries/clientnegative/stats_partialscan_non_native.q +++ /dev/null @@ -1,6 +0,0 @@ - -CREATE TABLE non_native1(key int, value string) -STORED BY 'org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler'; - --- we do not support analyze table ... partialscan on non-native tables yet -analyze table non_native1 compute statistics partialscan; http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/test/queries/clientnegative/stats_partscan_norcfile.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/stats_partscan_norcfile.q b/ql/src/test/queries/clientnegative/stats_partscan_norcfile.q deleted file mode 100644 index 56d93d0..0000000 --- a/ql/src/test/queries/clientnegative/stats_partscan_norcfile.q +++ /dev/null @@ -1,12 +0,0 @@ -set datanucleus.cache.collections=false; -set hive.stats.autogather=true; -set hive.exec.dynamic.partition=true; -set hive.exec.dynamic.partition.mode=nonstrict; - --- test analyze table ... compute statistics partialscan - -create table analyze_srcpart_partial_scan like srcpart; -insert overwrite table analyze_srcpart_partial_scan partition (ds, hr) select * from srcpart where ds is not null; -analyze table analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11) compute statistics partialscan; - - http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/test/queries/clientpositive/orc_analyze.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/orc_analyze.q b/ql/src/test/queries/clientpositive/orc_analyze.q index 3f9a592..5202c07 100644 --- a/ql/src/test/queries/clientpositive/orc_analyze.q +++ b/ql/src/test/queries/clientpositive/orc_analyze.q @@ -34,9 +34,6 @@ set hive.stats.autogather = true; analyze table orc_create_people compute statistics; desc formatted orc_create_people; -analyze table orc_create_people compute statistics partialscan; -desc formatted orc_create_people; - analyze table orc_create_people compute statistics noscan; desc formatted orc_create_people; @@ -80,10 +77,6 @@ analyze table orc_create_people partition(state) compute statistics; desc formatted orc_create_people partition(state="Ca"); desc formatted orc_create_people partition(state="Or"); -analyze table orc_create_people partition(state) compute statistics partialscan; -desc formatted orc_create_people partition(state="Ca"); -desc formatted orc_create_people partition(state="Or"); - analyze table orc_create_people partition(state) compute statistics noscan; desc formatted orc_create_people partition(state="Ca"); desc formatted orc_create_people partition(state="Or"); @@ -133,10 +126,6 @@ analyze table orc_create_people partition(state) compute statistics; desc formatted orc_create_people partition(state="Ca"); desc formatted orc_create_people partition(state="Or"); -analyze table orc_create_people partition(state) compute statistics partialscan; -desc formatted orc_create_people partition(state="Ca"); -desc formatted orc_create_people partition(state="Or"); - analyze table orc_create_people partition(state) compute statistics noscan; desc formatted orc_create_people partition(state="Ca"); desc formatted orc_create_people partition(state="Or"); @@ -186,9 +175,6 @@ set hive.stats.autogather = true; analyze table orc_create_people partition(state) compute statistics; desc formatted orc_create_people partition(state="Ca"); -analyze table orc_create_people partition(state) compute statistics partialscan; -desc formatted orc_create_people partition(state="Ca"); - analyze table orc_create_people partition(state) compute statistics noscan; desc formatted orc_create_people partition(state="Ca"); http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/test/queries/clientpositive/stats_partscan_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/stats_partscan_1.q b/ql/src/test/queries/clientpositive/stats_partscan_1.q deleted file mode 100644 index b790b7d..0000000 --- a/ql/src/test/queries/clientpositive/stats_partscan_1.q +++ /dev/null @@ -1,37 +0,0 @@ -set datanucleus.cache.collections=false; -set hive.stats.autogather=false; -set hive.exec.dynamic.partition=true; -set hive.exec.dynamic.partition.mode=nonstrict; -set mapred.min.split.size=256; -set mapred.min.split.size.per.node=256; -set mapred.min.split.size.per.rack=256; -set mapred.max.split.size=256; - --- INCLUDE_HADOOP_MAJOR_VERSIONS(0.20S) --- This test uses mapred.max.split.size/mapred.max.split.size for controlling --- number of input splits, which is not effective in hive 0.20. --- stats_partscan_1_23.q is the same test with this but has different result. - --- test analyze table ... compute statistics partialscan - --- 1. prepare data -CREATE table analyze_srcpart_partial_scan (key STRING, value STRING) -partitioned by (ds string, hr string) -stored as rcfile; -insert overwrite table analyze_srcpart_partial_scan partition (ds, hr) select * from srcpart where ds is not null; -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11); - -set hive.stats.autogather=true; - --- 2. partialscan -explain -analyze table analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11) compute statistics partialscan; -analyze table analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11) compute statistics partialscan; - --- 3. confirm result -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11); -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-09',hr=11); -drop table analyze_srcpart_partial_scan; - - - http://git-wip-us.apache.org/repos/asf/hive/blob/71004d2e/ql/src/test/queries/clientpositive/stats_partscan_1_23.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/stats_partscan_1_23.q b/ql/src/test/queries/clientpositive/stats_partscan_1_23.q deleted file mode 100644 index 905c4c2..0000000 --- a/ql/src/test/queries/clientpositive/stats_partscan_1_23.q +++ /dev/null @@ -1,38 +0,0 @@ -set datanucleus.cache.collections=false; -set hive.stats.autogather=false; -set hive.exec.dynamic.partition=true; -set hive.exec.dynamic.partition.mode=nonstrict; -set mapred.min.split.size=256; -set mapred.min.split.size.per.node=256; -set mapred.min.split.size.per.rack=256; -set mapred.max.split.size=256; -set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; - --- INCLUDE_HADOOP_MAJOR_VERSIONS(0.23) --- This test uses mapred.max.split.size/mapred.max.split.size for controlling --- number of input splits. --- stats_partscan_1.q is the same test with this but has different result. - --- test analyze table ... compute statistics partialscan - --- 1. prepare data -CREATE table analyze_srcpart_partial_scan (key STRING, value STRING) -partitioned by (ds string, hr string) -stored as rcfile; -insert overwrite table analyze_srcpart_partial_scan partition (ds, hr) select * from srcpart where ds is not null; -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11); - -set hive.stats.autogather=true; - --- 2. partialscan -explain -analyze table analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11) compute statistics partialscan; -analyze table analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11) compute statistics partialscan; - --- 3. confirm result -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-08',hr=11); -describe formatted analyze_srcpart_partial_scan PARTITION(ds='2008-04-09',hr=11); -drop table analyze_srcpart_partial_scan; - - -