IMPALA-5309: Adds TABLESAMPLE clause for HDFS table refs. Syntax: <tableref> TABLESAMPLE SYSTEM(<number>) [REPEATABLE(<number>)] The first number specifies the percent of table bytes to sample. The second number specifies the random seed to use.
The sampling is coarse-grained. Impala keeps randomly adding files to the sample until at least the desired percentage of file bytes have been reached. Examples: SELECT * FROM t TABLESAMPLE SYSTEM(10) SELECT * FROM t TABLESAMPLE SYSTEM(50) REPEATABLE(1234) Testing: - Added parser, analyser, planner, and end-to-end tests - Private core/hdfs run passed Change-Id: Ief112cfb1e4983c5d94c08696dc83da9ccf43f70 Reviewed-on: http://gerrit.cloudera.org:8080/6868 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ee0fc260 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ee0fc260 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ee0fc260 Branch: refs/heads/master Commit: ee0fc260d1420b34a3d3fb1073fe80b3c63a9ab9 Parents: 3f1ca63 Author: Alex Behm <[email protected]> Authored: Tue May 9 22:02:29 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed May 24 02:38:08 2017 +0000 ---------------------------------------------------------------------- fe/src/main/cup/sql-parser.cup | 56 ++++-- .../apache/impala/analysis/BaseTableRef.java | 11 +- .../impala/analysis/CollectionTableRef.java | 1 + .../apache/impala/analysis/DescriptorTable.java | 2 +- .../apache/impala/analysis/InlineViewRef.java | 9 +- .../impala/analysis/SingularRowSrcTableRef.java | 1 + .../org/apache/impala/analysis/TableRef.java | 26 ++- .../impala/analysis/TableSampleClause.java | 71 ++++++++ .../apache/impala/catalog/HdfsPartition.java | 19 +- .../org/apache/impala/catalog/HdfsTable.java | 86 ++++++++- .../impala/planner/HdfsPartitionPruner.java | 10 +- .../org/apache/impala/planner/HdfsScanNode.java | 84 ++++++--- .../impala/planner/SingleNodePlanner.java | 6 +- fe/src/main/jflex/sql-scanner.flex | 2 + .../impala/analysis/AnalyzeStmtsTest.java | 50 +++++- .../org/apache/impala/analysis/ParserTest.java | 57 +++++- .../org/apache/impala/analysis/ToSqlTest.java | 27 ++- .../org/apache/impala/planner/PlannerTest.java | 7 + .../queries/PlannerTest/tablesample.test | 177 +++++++++++++++++++ tests/query_test/test_tablesample.py | 64 +++++++ 20 files changed, 693 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/cup/sql-parser.cup ---------------------------------------------------------------------- diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index aba8dce..ebc7804 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -33,6 +33,7 @@ import org.apache.impala.analysis.ColumnDef.Option; import org.apache.impala.analysis.UnionStmt.Qualifier; import org.apache.impala.analysis.UnionStmt.UnionOperand; import org.apache.impala.analysis.RangePartition; +import org.apache.impala.analysis.TableSampleClause; import org.apache.impala.analysis.AlterTableAddDropRangePartitionStmt.Operation; import org.apache.impala.catalog.ArrayType; import org.apache.impala.catalog.MapType; @@ -261,15 +262,15 @@ terminal KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED, KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED, - KW_PURGE, KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME, KW_REPLACE, - KW_REPLICATION, KW_RESTRICT, KW_RETURNS, KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, - KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, - KW_SERDEPROPERTIES, KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT, KW_SORT, KW_STORED, - KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT, KW_SYMBOL, KW_TABLE, KW_TABLES, - KW_TBLPROPERTIES, KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, - KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UPDATE, - KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, - KW_WHERE, KW_WITH; + KW_PURGE, KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME, + KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT, KW_RETURNS, KW_REVOKE, + KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA, KW_SCHEMAS, + KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, KW_SERDEPROPERTIES, KW_SERIALIZE_FN, KW_SET, + KW_SHOW, KW_SMALLINT, KW_SORT, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT, + KW_SYMBOL, KW_TABLE, KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES, KW_TERMINATED, + KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE, + KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UPDATE, KW_UPDATE_FN, KW_UPSERT, KW_USE, + KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH; terminal COLON, SEMICOLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT; @@ -361,6 +362,7 @@ nonterminal ArrayList<String> dotted_path; nonterminal SlotRef slot_ref; nonterminal FromClause from_clause; nonterminal ArrayList<TableRef> table_ref_list; +nonterminal TableSampleClause opt_tablesample; nonterminal WithClause opt_with_clause; nonterminal ArrayList<View> with_view_def_list; nonterminal View with_view_def; @@ -483,6 +485,7 @@ nonterminal Boolean opt_kw_role; // identifiers rather than keywords. Throws a parse exception if the identifier does not // match the expected string. nonterminal key_ident; +nonterminal system_ident; nonterminal Boolean option_ident; nonterminal Boolean server_ident; nonterminal Boolean source_ident; @@ -1603,6 +1606,15 @@ key_ident ::= :} ; +system_ident ::= + IDENT:ident + {: + if (!ident.toUpperCase().equals("SYSTEM")) { + parser.parseError("identifier", SqlParserSymbols.IDENT, "SYSTEM"); + } + :} + ; + source_ident ::= IDENT:ident {: @@ -2385,12 +2397,12 @@ table_ref_list ::= ; table_ref ::= - dotted_path:path - {: RESULT = new TableRef(path, null); :} - | dotted_path:path alias_clause:alias - {: RESULT = new TableRef(path, alias); :} - | LPAREN query_stmt:query RPAREN alias_clause:alias - {: RESULT = new InlineViewRef(alias, query); :} + dotted_path:path opt_tablesample:tblsmpl + {: RESULT = new TableRef(path, null, tblsmpl); :} + | dotted_path:path alias_clause:alias opt_tablesample:tblsmpl + {: RESULT = new TableRef(path, alias, tblsmpl); :} + | LPAREN query_stmt:query RPAREN alias_clause:alias opt_tablesample:tblsmpl + {: RESULT = new InlineViewRef(alias, query, tblsmpl); :} ; join_operator ::= @@ -2459,6 +2471,16 @@ plan_hint_list ::= :} ; +opt_tablesample ::= + KW_TABLESAMPLE system_ident LPAREN INTEGER_LITERAL:p RPAREN + {: RESULT = new TableSampleClause(p.longValue(), null); :} + | KW_TABLESAMPLE system_ident LPAREN INTEGER_LITERAL:p RPAREN + KW_REPEATABLE LPAREN INTEGER_LITERAL:s RPAREN + {: RESULT = new TableSampleClause(p.longValue(), Long.valueOf(s.longValue())); :} + | /* empty */ + {: RESULT = null; :} + ; + ident_list ::= ident_or_default:ident {: @@ -3352,6 +3374,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_RENAME:r {: RESULT = r.toString(); :} + | KW_REPEATABLE:r + {: RESULT = r.toString(); :} | KW_REPLACE:r {: RESULT = r.toString(); :} | KW_REPLICATION:r @@ -3410,6 +3434,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_TABLES:r {: RESULT = r.toString(); :} + | KW_TABLESAMPLE:r + {: RESULT = r.toString(); :} | KW_TBLPROPERTIES:r {: RESULT = r.toString(); :} | KW_TERMINATED:r http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java index 009851d..4f8ccda 100644 --- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java @@ -62,6 +62,7 @@ public class BaseTableRef extends TableRef { analyzer.registerAuthAndAuditEvent(resolvedPath_.getRootTable(), priv_); desc_ = analyzer.registerTableRef(this); isAnalyzed_ = true; + analyzeTableSample(analyzer); analyzeHints(analyzer); analyzeJoin(analyzer); analyzeSkipHeaderLineCount(); @@ -71,13 +72,13 @@ public class BaseTableRef extends TableRef { protected String tableRefToSql() { // Enclose the alias in quotes if Hive cannot parse it without quotes. // This is needed for view compatibility between Impala and Hive. - String aliasSql = null; + String aliasSql = ""; String alias = getExplicitAlias(); - if (alias != null) aliasSql = ToSqlUtils.getIdentSql(alias); + if (alias != null) aliasSql = " " + ToSqlUtils.getIdentSql(alias); + String tableSampleSql = ""; + if (sampleParams_ != null) tableSampleSql = " " + sampleParams_.toSql(); String tableHintsSql = ToSqlUtils.getPlanHintsSql(tableHints_); - return getTable().getTableName().toSql() + - ((aliasSql != null) ? " " + aliasSql : "") + - (tableHintsSql != "" ? " " + tableHintsSql : ""); + return getTable().getTableName().toSql() + aliasSql + tableSampleSql + tableHintsSql; } public String debugString() { return tableRefToSql(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java b/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java index 05a3543..92015f5 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java @@ -110,6 +110,7 @@ public class CollectionTableRef extends TableRef { .toRequest()); } isAnalyzed_ = true; + analyzeTableSample(analyzer); analyzeHints(analyzer); // TODO: For joins on nested collections some join ops can be simplified http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java index f24a429..7728d94 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java +++ b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java @@ -124,7 +124,7 @@ public class DescriptorTable { * given table. */ public void addReferencedPartition(Table table, long partitionId) { - getReferencedPartitions(table).add(partitionId); + getReferencedPartitions(table).add(Long.valueOf(partitionId)); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java index 811d284..3d309e0 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java @@ -75,8 +75,9 @@ public class InlineViewRef extends TableRef { /** * C'tor for creating inline views parsed directly from the a query string. */ - public InlineViewRef(String alias, QueryStmt queryStmt) { - super(null, alias); + public InlineViewRef(String alias, QueryStmt queryStmt, + TableSampleClause sampleParams) { + super(null, alias, sampleParams); Preconditions.checkNotNull(queryStmt); queryStmt_ = queryStmt; view_ = null; @@ -85,7 +86,7 @@ public class InlineViewRef extends TableRef { } public InlineViewRef(String alias, QueryStmt queryStmt, List<String> colLabels) { - this(alias, queryStmt); + this(alias, queryStmt, (TableSampleClause) null); explicitColLabels_ = Lists.newArrayList(colLabels); } @@ -108,6 +109,7 @@ public class InlineViewRef extends TableRef { aliases_ = new String[] { view_.getTableName().toString().toLowerCase(), view_.getName().toLowerCase() }; + sampleParams_ = origTblRef.getSampleParams(); } /** @@ -213,6 +215,7 @@ public class InlineViewRef extends TableRef { baseTblSmap_.debugString()); } + analyzeTableSample(analyzer); analyzeHints(analyzer); // Now do the remaining join analysis analyzeJoin(analyzer); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/analysis/SingularRowSrcTableRef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/SingularRowSrcTableRef.java b/fe/src/main/java/org/apache/impala/analysis/SingularRowSrcTableRef.java index be9753a..0fd7cbf 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SingularRowSrcTableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/SingularRowSrcTableRef.java @@ -34,6 +34,7 @@ public class SingularRowSrcTableRef extends TableRef { public SingularRowSrcTableRef(PlanNode subplanInput) { super(null, "singular-row-src-tblref"); Preconditions.checkNotNull(subplanInput); + Preconditions.checkState(sampleParams_ == null); desc_ = null; isAnalyzed_ = true; tblRefIds_ = Lists.newArrayList(subplanInput.getTblRefIds()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/analysis/TableRef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java index a56b8cd..d5bb0fe 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java @@ -17,7 +17,6 @@ package org.apache.impala.analysis; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; @@ -81,6 +80,9 @@ public class TableRef implements ParseNode { // Analysis registers privilege and/or audit requests based on this privilege. protected final Privilege priv_; + // Optional TABLESAMPLE clause. Null if not specified. + protected TableSampleClause sampleParams_; + protected JoinOperator joinOp_; protected List<PlanHint> joinHints_ = Lists.newArrayList(); protected List<String> usingColNames_; @@ -132,7 +134,16 @@ public class TableRef implements ParseNode { this(path, alias, Privilege.SELECT); } + public TableRef(List<String> path, String alias, TableSampleClause tableSample) { + this(path, alias, tableSample, Privilege.SELECT); + } + public TableRef(List<String> path, String alias, Privilege priv) { + this(path, alias, null, priv); + } + + public TableRef(List<String> path, String alias, TableSampleClause sampleParams, + Privilege priv) { rawPath_ = path; if (alias != null) { aliases_ = new String[] { alias.toLowerCase() }; @@ -140,6 +151,7 @@ public class TableRef implements ParseNode { } else { hasExplicitAlias_ = false; } + sampleParams_ = sampleParams; priv_ = priv; isAnalyzed_ = false; replicaPreference_ = null; @@ -154,6 +166,7 @@ public class TableRef implements ParseNode { resolvedPath_ = other.resolvedPath_; aliases_ = other.aliases_; hasExplicitAlias_ = other.hasExplicitAlias_; + sampleParams_ = other.sampleParams_; priv_ = other.priv_; joinOp_ = other.joinOp_; joinHints_ = Lists.newArrayList(other.joinHints_); @@ -259,6 +272,7 @@ public class TableRef implements ParseNode { Preconditions.checkNotNull(resolvedPath_); return resolvedPath_.getRootTable(); } + public TableSampleClause getSampleParams() { return sampleParams_; } public Privilege getPrivilege() { return priv_; } public List<PlanHint> getJoinHints() { return joinHints_; } public List<PlanHint> getTableHints() { return tableHints_; } @@ -335,6 +349,16 @@ public class TableRef implements ParseNode { return allTableRefIds_; } + protected void analyzeTableSample(Analyzer analyzer) throws AnalysisException { + if (sampleParams_ == null) return; + sampleParams_.analyze(analyzer); + if (!(this instanceof BaseTableRef) + || !(resolvedPath_.destTable() instanceof HdfsTable)) { + throw new AnalysisException( + "TABLESAMPLE is only supported on HDFS base tables: " + getUniqueAlias()); + } + } + protected void analyzeHints(Analyzer analyzer) throws AnalysisException { // We prefer adding warnings over throwing exceptions here to maintain view // compatibility with Hive. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java b/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java new file mode 100644 index 0000000..13365c2 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/TableSampleClause.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import org.apache.impala.common.AnalysisException; +import org.apache.kudu.client.shaded.com.google.common.base.Preconditions; + +/** + * Represents a TABLESAMPLE clause. + * + * Syntax: + * <tableref> TABLESAMPLE SYSTEM(<number>) [REPEATABLE(<number>)] + * + * The first number specifies the percent of table bytes to sample. + * The second number specifies the random seed to use. + */ +public class TableSampleClause implements ParseNode { + // Required percent of bytes to sample. + private final long percentBytes_; + // Optional random seed. Null if not specified. + private final Long randomSeed_; + + public TableSampleClause(long percentBytes, Long randomSeed) { + percentBytes_ = percentBytes; + randomSeed_ = randomSeed; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (percentBytes_ < 0 || percentBytes_ > 100) { + throw new AnalysisException(String.format( + "Invalid percent of bytes value '%s'. " + + "The percent of bytes to sample must be between 0 and 100.", percentBytes_)); + } + } + + public long getPercentBytes() { return percentBytes_; } + public boolean hasRandomSeed() { return randomSeed_ != null; } + public long getRandomSeed() { + Preconditions.checkState(hasRandomSeed()); + return randomSeed_.longValue(); + } + + @Override + public TableSampleClause clone() { + return new TableSampleClause(percentBytes_, randomSeed_); + } + + @Override + public String toSql() { + StringBuilder builder = new StringBuilder(); + builder.append("TABLESAMPLE SYSTEM(" + percentBytes_ + ")"); + if (randomSeed_ != null) builder.append(" REPEATABLE(" + randomSeed_ + ")"); + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 76fddcf..35309c5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -26,9 +26,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.analysis.NullLiteral; @@ -41,10 +42,6 @@ import org.apache.impala.common.Reference; import org.apache.impala.fb.FbCompression; import org.apache.impala.fb.FbFileBlock; import org.apache.impala.fb.FbFileDesc; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.impala.thrift.ImpalaInternalServiceConstants; import org.apache.impala.thrift.TAccessLevel; import org.apache.impala.thrift.TExpr; @@ -56,8 +53,9 @@ import org.apache.impala.thrift.TPartitionStats; import org.apache.impala.thrift.TTableStats; import org.apache.impala.util.HdfsCachingUtil; import org.apache.impala.util.ListMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.flatbuffers.FlatBufferBuilder; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Objects; @@ -71,6 +69,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import com.google.common.primitives.Shorts; +import com.google.flatbuffers.FlatBufferBuilder; /** * Query-relevant information for one table partition. Partitions are comparable @@ -646,7 +645,7 @@ public class HdfsPartition implements Comparable<HdfsPartition> { public void setFileDescriptors(List<FileDescriptor> descriptors) { fileDescriptors_ = descriptors; } - public long getNumFileDescriptors() { + public int getNumFileDescriptors() { return fileDescriptors_ == null ? 0 : fileDescriptors_.size(); } @@ -707,7 +706,7 @@ public class HdfsPartition implements Comparable<HdfsPartition> { } } - private final CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_; + private CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_; public CachedHmsPartitionDescriptor getCachedMsPartitionDescriptor() { return cachedMsPartitionDescriptor_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index dcf74ce..b7ba44f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeMap; @@ -902,7 +903,7 @@ public class HdfsTable extends Table { if (nullPartitionIds_.get(i).isEmpty()) { stats.setNumDistinctValues(stats.getNumDistinctValues() + 1); } - nullPartitionIds_.get(i).add(partition.getId()); + nullPartitionIds_.get(i).add(Long.valueOf(partition.getId())); continue; } HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal); @@ -911,7 +912,7 @@ public class HdfsTable extends Table { partitionValuesMap_.get(i).put(literal, partitionIds); stats.setNumDistinctValues(stats.getNumDistinctValues() + 1); } - partitionIds.add(partition.getId()); + partitionIds.add(Long.valueOf(partition.getId())); } } @@ -1913,4 +1914,85 @@ public class HdfsTable extends Table { dropPartition(oldPartition); addPartition(refreshedPartition); } + + /** + * Selects a random sample of files from the given list of partitions such that the sum + * of file sizes is at least 'percentBytes' percent of the total number of bytes in + * those partitions. The sample is returned as a map from partition id to a list of + * file descriptors selected from that partition. + * This function allocates memory proportional to the number of files in 'inputParts'. + * Its implementation tries to minimize the constant factor and object generation. + * The given 'randomSeed' is used for random number generation. + * The 'percentBytes' parameter must be between 0 and 100. + */ + public Map<Long, List<FileDescriptor>> getFilesSample(List<HdfsPartition> inputParts, + long percentBytes, long randomSeed) { + Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100); + + // Conservative max size for Java arrays. The actual maximum varies + // from JVM version and sometimes between configurations. + final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10; + if (numHdfsFiles_ > JVM_MAX_ARRAY_SIZE) { + throw new IllegalStateException(String.format( + "Too many files to generate a table sample. " + + "Table '%s' has %s files, but a maximum of %s files are supported.", + getTableName().toString(), numHdfsFiles_, JVM_MAX_ARRAY_SIZE)); + } + int totalNumFiles = (int) numHdfsFiles_; + + // Ensure a consistent ordering of files for repeatable runs. The files within a + // partition are already ordered based on how they are loaded in the catalog. + List<HdfsPartition> orderedParts = Lists.newArrayList(inputParts); + Collections.sort(orderedParts); + + // fileIdxs contains indexes into the file descriptor lists of all inputParts + // parts[i] contains the partition corresponding to fileIdxs[i] + // fileIdxs[i] is an index into the file descriptor list of the partition parts[i] + // Use max size to avoid looping over inputParts for the exact size. + // The purpose of these arrays is to efficiently avoid selecting the same file + // multiple times during the sampling, regardless of the sample percent. We purposely + // avoid generating objects proportional to the number of files. + int[] fileIdxs = new int[totalNumFiles]; + HdfsPartition[] parts = new HdfsPartition[totalNumFiles]; + int idx = 0; + long totalBytes = 0; + for (HdfsPartition part: inputParts) { + totalBytes += part.getSize(); + int numFds = part.getNumFileDescriptors(); + for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) { + fileIdxs[idx] = fileIdx; + parts[idx] = part; + ++idx; + } + } + + int numFilesRemaining = idx; + double fracPercentBytes = (double) percentBytes / 100; + long targetBytes = (long) Math.round(totalBytes * fracPercentBytes); + + // Randomly select files until targetBytes has been reached or all files have been + // selected. + Random rnd = new Random(randomSeed); + long selectedBytes = 0; + Map<Long, List<FileDescriptor>> result = Maps.newHashMap(); + while (selectedBytes < targetBytes && numFilesRemaining > 0) { + int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining; + HdfsPartition part = parts[selectedIdx]; + Long partId = Long.valueOf(part.getId()); + List<FileDescriptor> sampleFileIdxs = result.get(partId); + if (sampleFileIdxs == null) { + sampleFileIdxs = Lists.newArrayList(); + result.put(partId, sampleFileIdxs); + } + FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]); + sampleFileIdxs.add(fd); + selectedBytes += fd.getFileLength(); + // Avoid selecting the same file multiple times. + fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1]; + parts[selectedIdx] = parts[numFilesRemaining - 1]; + --numFilesRemaining; + } + + return result; + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java index 95ef68c..7c05283 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java @@ -64,6 +64,11 @@ import com.google.common.collect.Sets; * on applicable conjuncts. It returns a list of partitions left after applying all * the conjuncts and also removes the conjuncts which have been fully evaluated with * the partition columns. + * + * The pruner does not update referenced partitions in the DescriptorTable because + * not all users of this class require the resulting partitions to be serialized, e.g., + * DDL commands. + * It is up to the user of this class to mark referenced partitions as needed. */ public class HdfsPartitionPruner { @@ -154,10 +159,7 @@ public class HdfsPartitionPruner { for (Long id: matchingPartitionIds) { HdfsPartition partition = partitionMap.get(id); Preconditions.checkNotNull(partition); - if (partition.hasFileDescriptors() || allowEmpty) { - results.add(partition); - analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId()); - } + if (partition.hasFileDescriptors() || allowEmpty) results.add(partition); } return results; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 361a4d2..47720fd 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -27,12 +27,10 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; - import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.Expr; -import org.apache.impala.analysis.FunctionCallExpr; import org.apache.impala.analysis.InPredicate; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.analysis.NullLiteral; @@ -40,17 +38,19 @@ import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotId; import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.TableRef; +import org.apache.impala.analysis.TableSampleClause; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleId; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsPartition.FileBlock; +import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.Type; import org.apache.impala.common.FileSystemUtil; -import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.NotImplementedException; import org.apache.impala.common.PrintUtils; import org.apache.impala.common.RuntimeEnv; @@ -81,13 +81,17 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** - * Scan of a single table. Currently limited to full-table scans. + * Scan of a single table. * * It's expected that the creator of this object has already done any necessary * partition pruning before creating this object. In other words, the 'conjuncts' * passed to the constructors are conjuncts not fully evaluated by partition pruning * and 'partitions' are the remaining partitions after pruning. * + * Supports scanning a random sample of files based on the parameters from a + * TABLESAMPLE clause. Scan predicates and the sampling are independent, so we first + * prune partitions and then randomly select files from those partitions. + * * For scans of tables with Parquet files the class creates an additional list of * conjuncts that are passed to the backend and will be evaluated against the * parquet::Statistics of row groups. If the conjuncts don't match, then whole row groups @@ -113,16 +117,19 @@ public class HdfsScanNode extends ScanNode { private final HdfsTable tbl_; - // Partitions that are filtered in for scanning by the key ranges + // List of partitions to be scanned. Partitions have been pruned. private final List<HdfsPartition> partitions_; + // Parameters for table sampling. Null if not sampling. + private final TableSampleClause sampleParams_; + private final TReplicaPreference replicaPreference_; private final boolean randomReplica_; - // Total number of files from partitions_ + // Number of partitions, files and bytes scanned. Set in computeScanRangeLocations(). + // Might not match 'partitions_' due to table sampling. + private int numPartitions_ = 0; private long totalFiles_ = 0; - - // Total number of bytes from partitions_ private long totalBytes_ = 0; // True if this scan node should use the MT implementation in the backend. @@ -183,6 +190,7 @@ public class HdfsScanNode extends ScanNode { tbl_ = (HdfsTable)desc.getTable(); conjuncts_ = conjuncts; partitions_ = partitions; + sampleParams_ = hdfsTblRef.getSampleParams(); replicaPreference_ = hdfsTblRef.getReplicaPreference(); randomReplica_ = hdfsTblRef.getRandomReplica(); HdfsTable hdfsTable = (HdfsTable)hdfsTblRef.getTable(); @@ -216,7 +224,7 @@ public class HdfsScanNode extends ScanNode { computeDictionaryFilterConjuncts(analyzer); computeMemLayout(analyzer); - // compute scan range locations + // compute scan range locations with optional sampling Set<HdfsFileFormat> fileFormats = computeScanRangeLocations(analyzer); // Determine backend scan node implementation to use. The optimized MT implementation @@ -519,15 +527,47 @@ public class HdfsScanNode extends ScanNode { /** * Computes scan ranges (hdfs splits) plus their storage locations, including volume * ids, based on the given maximum number of bytes each scan range should scan. + * If 'sampleParams_' is not null, generates a sample and computes the scan ranges + * based on the sample. * Returns the set of file formats being scanned. */ private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer) throws ImpalaRuntimeException { + List<HdfsPartition> partitions = partitions_; + Map<Long, List<FileDescriptor>> sampledFiles = null; + if (sampleParams_ != null) { + long percentBytes = sampleParams_.getPercentBytes(); + long randomSeed; + if (sampleParams_.hasRandomSeed()) { + randomSeed = sampleParams_.getRandomSeed(); + } else { + randomSeed = System.currentTimeMillis(); + } + sampledFiles = tbl_.getFilesSample(partitions_, percentBytes, randomSeed); + if (sampledFiles.size() != partitions_.size()) { + partitions = Lists.newArrayListWithCapacity(sampledFiles.size()); + for (Long partId: sampledFiles.keySet()) { + partitions.add(tbl_.getPartitionMap().get(partId)); + } + } + } + long maxScanRangeLength = analyzer.getQueryCtx().client_request.getQuery_options() .getMax_scan_range_length(); scanRanges_ = Lists.newArrayList(); + numPartitions_ = partitions.size(); + totalFiles_ = 0; + totalBytes_ = 0; Set<HdfsFileFormat> fileFormats = Sets.newHashSet(); for (HdfsPartition partition: partitions_) { + List<FileDescriptor> fileDescs = partition.getFileDescriptors(); + if (sampledFiles != null) { + // If we are sampling, check whether this partition is included in the sample. + fileDescs = sampledFiles.get(Long.valueOf(partition.getId())); + if (fileDescs == null) continue; + } + + analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId()); fileFormats.add(partition.getFileFormat()); Preconditions.checkState(partition.getId() >= 0); // Missing disk id accounting is only done for file systems that support the notion @@ -540,7 +580,10 @@ public class HdfsScanNode extends ScanNode { } boolean checkMissingDiskIds = FileSystemUtil.supportsStorageIds(partitionFs); boolean partitionMissingDiskIds = false; - for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) { + + totalFiles_ += fileDescs.size(); + for (FileDescriptor fileDesc: fileDescs) { + totalBytes_ += fileDesc.getFileLength(); boolean fileDescMissingDiskIds = false; for (int j = 0; j < fileDesc.getNumFileBlocks(); ++j) { FbFileBlock block = fileDesc.getFbFileBlock(j); @@ -616,8 +659,6 @@ public class HdfsScanNode extends ScanNode { LOG.trace("collecting partitions for table " + tbl_.getName()); } numPartitionsMissingStats_ = 0; - totalFiles_ = 0; - totalBytes_ = 0; if (tbl_.getNumClusteringCols() == 0) { cardinality_ = tbl_.getNumRows(); if (cardinality_ < -1 || (cardinality_ == 0 && tbl_.getTotalHdfsBytes() > 0)) { @@ -628,8 +669,6 @@ public class HdfsScanNode extends ScanNode { cardinality_ = 0; } else { Preconditions.checkState(partitions_.size() == 1); - totalFiles_ += partitions_.get(0).getFileDescriptors().size(); - totalBytes_ += partitions_.get(0).getSize(); } } else { cardinality_ = 0; @@ -647,8 +686,6 @@ public class HdfsScanNode extends ScanNode { } else { ++numPartitionsMissingStats_; } - totalFiles_ += p.getFileDescriptors().size(); - totalBytes_ += p.getSize(); } if (!partitions_.isEmpty() && !hasValidPartitionCardinality) { // if none of the partitions knew its number of rows, we fall back on @@ -656,6 +693,14 @@ public class HdfsScanNode extends ScanNode { cardinality_ = tbl_.getNumRows(); } } + + // Adjust the cardinality based on table sampling. + if (sampleParams_ != null && cardinality_ != -1) { + double fracPercBytes = (double) sampleParams_.getPercentBytes() / 100; + cardinality_ = Math.round(cardinality_ * fracPercBytes); + cardinality_ = Math.max(cardinality_, 1); + } + // Adjust cardinality for all collections referenced along the tuple's path. if (cardinality_ != -1) { for (Type t: desc_.getPath().getMatchedTypes()) { @@ -785,16 +830,15 @@ public class HdfsScanNode extends ScanNode { HdfsTable table = (HdfsTable) desc_.getTable(); output.append(String.format("%s%s [%s", prefix, getDisplayLabel(), getDisplayLabelDetail())); - int numPartitions = partitions_.size(); if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() && fragment_.isPartitioned()) { output.append(", " + fragment_.getDataPartition().getExplainString()); } output.append("]\n"); if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - if (tbl_.getNumClusteringCols() == 0) numPartitions = 1; + if (tbl_.getNumClusteringCols() == 0) numPartitions_ = 1; output.append(String.format("%spartitions=%s/%s files=%s size=%s", detailPrefix, - numPartitions, table.getPartitions().size() - 1, totalFiles_, + numPartitions_, table.getPartitions().size() - 1, totalFiles_, PrintUtils.printBytes(totalBytes_))); output.append("\n"); if (!conjuncts_.isEmpty()) { @@ -820,7 +864,7 @@ public class HdfsScanNode extends ScanNode { if (numScanRangesNoDiskIds_ > 0) { output.append(String.format("%smissing disk ids: " + "partitions=%s/%s files=%s/%s scan ranges %s/%s\n", detailPrefix, - numPartitionsNoDiskIds_, numPartitions, numFilesNoDiskIds_, + numPartitionsNoDiskIds_, numPartitions_, numFilesNoDiskIds_, totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size())); } if (!minMaxOriginalConjuncts_.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index eb96cdc..0de35b4 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -47,6 +47,7 @@ import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotId; import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.TableRef; +import org.apache.impala.analysis.TableSampleClause; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleId; import org.apache.impala.analysis.TupleIsNullPredicate; @@ -60,6 +61,7 @@ import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Table; import org.apache.impala.catalog.Type; +import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; import org.apache.impala.common.NotImplementedException; @@ -1198,8 +1200,8 @@ public class SingleNodePlanner { List<Expr> conjuncts, Analyzer analyzer) throws ImpalaException { TupleDescriptor tupleDesc = hdfsTblRef.getDesc(); - // Do partition pruning before deciding which slots to materialize, - // We might end up removing some predicates. + // Do partition pruning before deciding which slots to materialize because we might + // end up removing some predicates. HdfsPartitionPruner pruner = new HdfsPartitionPruner(tupleDesc); List<HdfsPartition> partitions = pruner.prunePartitions(analyzer, conjuncts, false); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/main/jflex/sql-scanner.flex ---------------------------------------------------------------------- diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index fb01eaa..ee8355d 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -186,6 +186,7 @@ import org.apache.impala.analysis.SqlParserSymbols; keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH)); keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP)); keywordMap.put("rename", new Integer(SqlParserSymbols.KW_RENAME)); + keywordMap.put("repeatable", new Integer(SqlParserSymbols.KW_REPEATABLE)); keywordMap.put("replace", new Integer(SqlParserSymbols.KW_REPLACE)); keywordMap.put("replication", new Integer(SqlParserSymbols.KW_REPLICATION)); keywordMap.put("restrict", new Integer(SqlParserSymbols.KW_RESTRICT)); @@ -216,6 +217,7 @@ import org.apache.impala.analysis.SqlParserSymbols; keywordMap.put("symbol", new Integer(SqlParserSymbols.KW_SYMBOL)); keywordMap.put("table", new Integer(SqlParserSymbols.KW_TABLE)); keywordMap.put("tables", new Integer(SqlParserSymbols.KW_TABLES)); + keywordMap.put("tablesample", new Integer(SqlParserSymbols.KW_TABLESAMPLE)); keywordMap.put("tblproperties", new Integer(SqlParserSymbols.KW_TBLPROPERTIES)); keywordMap.put("terminated", new Integer(SqlParserSymbols.KW_TERMINATED)); keywordMap.put("textfile", new Integer(SqlParserSymbols.KW_TEXTFILE)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index 35acda9..448c62f 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -27,11 +27,10 @@ import org.apache.impala.catalog.PrimitiveType; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.RuntimeEnv; import org.junit.Assert; import org.junit.Test; -import org.apache.impala.common.RuntimeEnv; -import org.apache.impala.testutil.TestUtils; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -316,6 +315,51 @@ public class AnalyzeStmtsTest extends AnalyzerTest { "Duplicate table alias: 'functional.alltypes'"); } + @Test + public void TestTableSampleClause() { + long bytesPercVals[] = new long[] { 0, 10, 50, 100 }; + long randomSeedVals[] = new long[] { 0, 10, 100, Integer.MAX_VALUE, Long.MAX_VALUE }; + for (long bytesPerc: bytesPercVals) { + String tblSmpl = String.format("tablesample system (%s)", bytesPerc); + AnalyzesOk("select * from functional.alltypes " + tblSmpl); + for (long randomSeed: randomSeedVals) { + String repTblSmpl = String.format("%s repeatable (%s)", tblSmpl, randomSeed); + AnalyzesOk("select * from functional.alltypes " + repTblSmpl); + } + } + + // Invalid bytes percent. Negative values do not parse. + AnalysisError("select * from functional.alltypes tablesample system (101)", + "Invalid percent of bytes value '101'. " + + "The percent of bytes to sample must be between 0 and 100."); + AnalysisError("select * from functional.alltypes tablesample system (1000)", + "Invalid percent of bytes value '1000'. " + + "The percent of bytes to sample must be between 0 and 100."); + + // Only applicable to HDFS base table refs. + AnalysisError("select * from functional_kudu.alltypes tablesample system (10)", + "TABLESAMPLE is only supported on HDFS base tables: functional_kudu.alltypes"); + AnalysisError("select * from functional_hbase.alltypes tablesample system (10)", + "TABLESAMPLE is only supported on HDFS base tables: functional_hbase.alltypes"); + AnalysisError("select * from functional.alltypes_datasource tablesample system (10)", + "TABLESAMPLE is only supported on HDFS base tables: " + + "functional.alltypes_datasource"); + AnalysisError("select * from (select * from functional.alltypes) v " + + "tablesample system (10)", + "TABLESAMPLE is only supported on HDFS base tables: v"); + AnalysisError("with v as (select * from functional.alltypes) " + + "select * from v tablesample system (10)", + "TABLESAMPLE is only supported on HDFS base tables: v"); + AnalysisError("select * from functional.alltypes_view tablesample system (10)", + "TABLESAMPLE is only supported on HDFS base tables: functional.alltypes_view"); + AnalysisError("select * from functional.allcomplextypes.int_array_col " + + "tablesample system (10)", + "TABLESAMPLE is only supported on HDFS base tables: int_array_col"); + AnalysisError("select * from functional.allcomplextypes a, a.int_array_col " + + "tablesample system (10)", + "TABLESAMPLE is only supported on HDFS base tables: int_array_col"); + } + /** * Helper function that returns a list of integers used to improve readability * in the path-related tests below. @@ -3495,7 +3539,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest { testNumberOfMembers(ValuesStmt.class, 0); // Also check TableRefs. - testNumberOfMembers(TableRef.class, 19); + testNumberOfMembers(TableRef.class, 20); testNumberOfMembers(BaseTableRef.class, 0); testNumberOfMembers(InlineViewRef.class, 8); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/test/java/org/apache/impala/analysis/ParserTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index 9fb1e2a..3650ace 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -30,7 +30,6 @@ import org.apache.impala.analysis.TimestampArithmeticExpr.TimeUnit; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FrontendTestBase; import org.apache.impala.compat.MetastoreShim; -import org.apache.impala.testutil.TestUtils; import org.junit.Test; import com.google.common.base.Preconditions; @@ -587,6 +586,58 @@ public class ParserTest extends FrontendTestBase { } @Test + public void TestTableSampleClause() { + String tblRefs[] = new String[] { "tbl", "db.tbl", "db.tbl.col", "db.tbl.col.fld" }; + String tblAliases[] = new String[] { "", "t" }; + String tblSampleClauses[] = new String[] { + "", "tablesample system(10)", "tablesample system(100) repeatable(20)" }; + String tblHints[] = new String[] { + "", "/* +schedule_remote */", "[schedule_random_replica]" + }; + for (String tbl: tblRefs) { + for (String alias: tblAliases) { + for (String smp: tblSampleClauses) { + for (String hint: tblHints) { + // Single table. + ParsesOk(String.format("select * from %s %s %s %s", tbl, alias, smp, hint)); + // Multi table. + ParsesOk(String.format( + "select a.* from %s %s %s %s join %s %s %s %s using (id)", + tbl, alias, smp, hint, tbl, alias, smp, hint)); + ParsesOk(String.format( + "select a.* from %s %s %s %s, %s %s %s %s", + tbl, alias, smp, hint, tbl, alias, smp, hint)); + // Inline view. + ParsesOk(String.format("select * from (select 1 from %s %s) v %s %s", + tbl, alias, smp, hint)); + + } + } + } + } + + // Table alias most come before TABLESAMPLE. + ParserError("select * from t tablesample (10) a"); + // Hints must come after TABLESAMPLE. + ParserError("select * from t [schedule_remote] tablesample (10)"); + ParserError("select * from t /* +schedule_remote */ tablesample (10)"); + // Missing SYSTEM. + ParserError("select * from t tablesample (10)"); + // Missing parenthesis. + ParserError("select * from t tablesample system 10"); + // Percent must be int literal. + ParserError("select * from t tablesample system (10 + 10"); + // Missing random seed. + ParserError("select * from t tablesample system (10) repeatable"); + // Random seed must be an int literal. + ParserError("select * from t tablesample system (10) repeatable (10 + 10)"); + // Negative precent. + ParserError("select * from t tablesample system (-10)"); + // Negative random seed. + ParserError("select * from t tablesample system (10) repeatable(-10)"); + } + + @Test public void TestWhereClause() { ParsesOk("select a, b from t where a > 15"); ParsesOk("select a, b from t where true"); @@ -3156,7 +3207,7 @@ public class ParserTest extends FrontendTestBase { " ^\n" + "Encountered: IDENTIFIER\n" + "Expected: CROSS, FROM, FULL, GROUP, HAVING, INNER, JOIN, LEFT, LIMIT, OFFSET, " + - "ON, ORDER, RIGHT, STRAIGHT_JOIN, UNION, USING, WHERE, COMMA\n"); + "ON, ORDER, RIGHT, STRAIGHT_JOIN, TABLESAMPLE, UNION, USING, WHERE, COMMA\n"); // Long line: error close to the start ParserError("select a a a, b, c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,cd,c,d,d,,c, from t", @@ -3165,7 +3216,7 @@ public class ParserTest extends FrontendTestBase { " ^\n" + "Encountered: IDENTIFIER\n" + "Expected: CROSS, FROM, FULL, GROUP, HAVING, INNER, JOIN, LEFT, LIMIT, OFFSET, " + - "ON, ORDER, RIGHT, STRAIGHT_JOIN, UNION, USING, WHERE, COMMA\n"); + "ON, ORDER, RIGHT, STRAIGHT_JOIN, TABLESAMPLE, UNION, USING, WHERE, COMMA\n"); // Long line: error close to the end ParserError("select a, b, c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,cd,c,d,d, ,c, from t", http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java index cd14d25..98e0e3a 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java @@ -524,20 +524,20 @@ public class ToSqlTest extends FrontendTestBase { testToSql(String.format( "select * from functional.alltypes at %sschedule_random_replica%s", prefix, suffix), - "SELECT * FROM functional.alltypes at \n-- +schedule_random_replica\n"); + "SELECT * FROM functional.alltypes at\n-- +schedule_random_replica\n"); testToSql(String.format( "select * from functional.alltypes %sschedule_random_replica%s", prefix, suffix), - "SELECT * FROM functional.alltypes \n-- +schedule_random_replica\n"); + "SELECT * FROM functional.alltypes\n-- +schedule_random_replica\n"); testToSql(String.format( "select * from functional.alltypes %sschedule_random_replica," + "schedule_disk_local%s", prefix, suffix), - "SELECT * FROM functional.alltypes \n-- +schedule_random_replica," + + "SELECT * FROM functional.alltypes\n-- +schedule_random_replica," + "schedule_disk_local\n"); testToSql(String.format( "select c1 from (select at.tinyint_col as c1 from functional.alltypes at " + "%sschedule_random_replica%s) s1", prefix, suffix), - "SELECT c1 FROM (SELECT at.tinyint_col c1 FROM functional.alltypes at \n-- +" + + "SELECT c1 FROM (SELECT at.tinyint_col c1 FROM functional.alltypes at\n-- +" + "schedule_random_replica\n) s1"); // Select-list hint. The legacy-style hint has no prefix and suffix. @@ -1289,4 +1289,23 @@ public class ToSqlTest extends FrontendTestBase { testToSql("set `a b` = \"x y\"", "SET `a b`='x y'"); testToSql("set", "SET"); } + + @Test + public void testTableSample() { + testToSql("select * from functional.alltypes tablesample system(10)", + "SELECT * FROM functional.alltypes TABLESAMPLE SYSTEM(10)"); + testToSql( + "select * from functional.alltypes tablesample system(10) repeatable(20)", + "SELECT * FROM functional.alltypes TABLESAMPLE SYSTEM(10) REPEATABLE(20)"); + testToSql( + "select * from functional.alltypes a " + + "tablesample system(10) /* +schedule_random */", + "SELECT * FROM functional.alltypes a " + + "TABLESAMPLE SYSTEM(10)\n-- +schedule_random\n"); + testToSql( + "with t as (select * from functional.alltypes tablesample system(5)) " + + "select * from t", + "WITH t AS (SELECT * FROM functional.alltypes TABLESAMPLE SYSTEM(5)) " + + "SELECT * FROM t"); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index b056951..07b49a2 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -406,4 +406,11 @@ public class PlannerTest extends PlannerTestBase { options.setExplain_level(TExplainLevel.EXTENDED); runPlannerTestFile("sort-expr-materialization", options); } + + @Test + public void testTableSample() { + TQueryOptions options = defaultQueryOptions(); + options.setExplain_level(TExplainLevel.EXTENDED); + runPlannerTestFile("tablesample", options); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test new file mode 100644 index 0000000..8257661 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test @@ -0,0 +1,177 @@ +# Sample 10% +select * from functional.alltypes tablesample system(10) repeatable(1234) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=3/24 files=3 size=60.68KB + table stats: 7300 rows total + column stats: all + mem-estimate=32.00MB mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=730 +==== +# Sample 50% +select * from functional.alltypes tablesample system(50) repeatable(1234) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=12/24 files=12 size=240.27KB + table stats: 7300 rows total + column stats: all + mem-estimate=80.00MB mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=3650 +==== +# Sampling and scan predicates. Scan predicates are evaluated after sampling and +# that is reflected in the cardinality. +select * from functional.alltypes tablesample system(50) repeatable(1234) +where id < 10 +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=12/24 files=12 size=239.26KB + predicates: id < 10 + table stats: 7300 rows total + column stats: all + parquet dictionary predicates: id < 10 + mem-estimate=80.00MB mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=365 +==== +# Partition pruning + sampling. Partition pruning happens after sampling. +select * from functional.alltypes tablesample system(50) repeatable(1234) +where year = 2009 +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=6/24 files=6 size=119.04KB + table stats: 7300 rows total + column stats: all + mem-estimate=48.00MB mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=1825 +==== +# Edge case: sample 0%, no files should be selected +select * from functional.alltypes tablesample system(0) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=0/24 files=0 size=0B + table stats: 7300 rows total + column stats: all + mem-estimate=0B mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=1 +==== +# Edge case: sample 1%, at least one file should be selected +select * from functional.alltypes tablesample system(1) repeatable(1234) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=1/24 files=1 size=20.36KB + table stats: 7300 rows total + column stats: all + mem-estimate=32.00MB mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=73 +==== +# Edge case: sample 1% and prune partitions, at least one file should be selected +select * from functional.alltypes tablesample system(1) repeatable(1234) +where year = 2010 +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=1/24 files=1 size=20.36KB + table stats: 7300 rows total + column stats: all + mem-estimate=32.00MB mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=37 +==== +# Edge case: sample 100%, all files should be selected +select * from functional.alltypes tablesample system (100) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB + table stats: 7300 rows total + column stats: all + mem-estimate=128.00MB mem-reservation=0B + tuple-ids=0 row-size=97B cardinality=7300 +==== +# Table that has no stats. +select id from functional_parquet.alltypes tablesample system(10) repeatable(1234) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=3/24 files=3 size=22.53KB + table stats: unavailable + column stats: unavailable + mem-estimate=16.00MB mem-reservation=0B + tuple-ids=0 row-size=4B cardinality=unavailable +==== +# Sampling in a subquery. +select id from functional.alltypes t1 where exists ( + select id from functional.alltypessmall t2 tablesample system(10) repeatable(1234) + where t1.id = t2.id) +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +02:HASH JOIN [LEFT SEMI JOIN] +| hash predicates: t1.id = t2.id +| runtime filters: RF000 <- t2.id +| mem-estimate=44B mem-reservation=136.00MB +| tuple-ids=0 row-size=4B cardinality=10 +| +|--01:SCAN HDFS [functional.alltypessmall t2] +| partitions=1/4 files=1 size=1.57KB +| table stats: 100 rows total +| column stats: all +| mem-estimate=32.00MB mem-reservation=0B +| tuple-ids=1 row-size=4B cardinality=10 +| +00:SCAN HDFS [functional.alltypes t1] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> t1.id + table stats: 7300 rows total + column stats: all + mem-estimate=128.00MB mem-reservation=0B + tuple-ids=0 row-size=4B cardinality=7300 +==== +# Sampling in WITH-clause view. +with t as (select * from functional.alltypes tablesample system(10) repeatable(1234)) +select id from t +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B +| +00:SCAN HDFS [functional.alltypes] + partitions=3/24 files=3 size=60.68KB + table stats: 7300 rows total + column stats: all + mem-estimate=32.00MB mem-reservation=0B + tuple-ids=0 row-size=4B cardinality=730 +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ee0fc260/tests/query_test/test_tablesample.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_tablesample.py b/tests/query_test/test_tablesample.py new file mode 100644 index 0000000..f3eaaaa --- /dev/null +++ b/tests/query_test/test_tablesample.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests the TABLESAMPLE clause. + +import pytest + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_vector import ImpalaTestDimension + +class TestTableSample(ImpalaTestSuite): + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestTableSample, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('repeatable', *[True, False])) + # Tablesample is only supported on HDFS tables. + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format != 'kudu' and + v.get_value('table_format').file_format != 'hbase') + if cls.exploration_strategy() != 'exhaustive': + # Cut down on core testing time by limiting the file formats. + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'parquet' or + v.get_value('table_format').file_format == 'text') + + def test_tablesample(self, vector): + # Do not use a .test to avoid making this test flaky. + # 1. Queries without the repeatable clause are non-deterministic. + # 2. The results of queries without a repeatable clause could change due to + # changes in data loading that affect the number or size of files. + repeatable = vector.get_value('repeatable') + ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) + result = self.client.execute("select count(*) from alltypes") + baseline_count = int(result.data[0]) + prev_count = None + for perc in [5, 20, 50]: + rep_sql = "" + if repeatable: rep_sql = " repeatable(1)" + result = self.client.execute( + "select count(*) from alltypes tablesample system(%s)%s" % (perc, rep_sql)) + count = int(result.data[0]) + assert count < baseline_count + if prev_count and repeatable: + # May not necessarily be true for non-repeatable samples + assert count > prev_count + prev_count = count
