IMPALA-4166: Add SORT BY sql clause This change adds support for adding SORT BY (...) clauses to CREATE TABLE and ALTER TABLE statements. Examples are:
CREATE TABLE t (i INT, j INT, k INT) PARTITIONED BY (l INT) SORT BY (i, j); CREATE TABLE t SORT BY (int_col,id) LIKE u; CREATE TABLE t LIKE PARQUET '/foo' SORT BY (id,zip); ALTER TABLE t SORT BY (int_col,id); ALTER TABLE t SORT BY (); Sort columns can only be specified for Hdfs tables and effectiveness may vary based on storage type; for example TEXT tables will not see improved compression. The SORT BY clause must not contain clustering columns. The columns in the SORT BY clause are stored in the 'sort.columns' table property and will result in an additional SORT node being added to the plan before the final table sink. Specifying sort columns also enables clustering during inserts, so the SORT node will contain all partitioning columns first, followed by the sort columns. We do this because sort columns add a SORT node to the plan and adding the clustering columns to the SORT node is cheap. Sort columns supersede the sortby() hint, which we will remove in a subsequent change (IMPALA-5144). Until then, it is possible to specify sort columns using both ways at the same time and the column lists will be concatenated. Change-Id: I08834f38a941786ab45a4381c2732d929a934f75 Reviewed-on: http://gerrit.cloudera.org:8080/6495 Reviewed-by: Lars Volker <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1ada9dac Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1ada9dac Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1ada9dac Branch: refs/heads/master Commit: 1ada9dac88f5665d228538d61c3fa21c161be8f8 Parents: 0b9d2ce Author: Lars Volker <[email protected]> Authored: Thu Mar 16 03:46:42 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri May 12 15:43:30 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-table-writer.cc | 2 +- be/src/exec/hdfs-table-sink.cc | 2 +- be/src/exec/hdfs-table-sink.h | 4 +- common/thrift/DataSinks.thrift | 2 +- common/thrift/JniCatalog.thrift | 8 + fe/src/main/cup/sql-parser.cup | 40 +- .../analysis/AlterTableSetTblProperties.java | 40 ++ .../impala/analysis/AlterTableSortByStmt.java | 84 +++++ .../org/apache/impala/analysis/Analyzer.java | 11 +- .../apache/impala/analysis/BaseTableRef.java | 3 +- .../analysis/CreateTableLikeFileStmt.java | 10 +- .../impala/analysis/CreateTableLikeStmt.java | 24 +- .../apache/impala/analysis/CreateTableStmt.java | 2 + .../org/apache/impala/analysis/InsertStmt.java | 90 +++-- .../org/apache/impala/analysis/TableDef.java | 102 ++++- .../org/apache/impala/analysis/ToSqlUtils.java | 61 ++- .../java/org/apache/impala/catalog/Column.java | 6 + .../java/org/apache/impala/catalog/Table.java | 19 +- .../apache/impala/planner/HdfsTableSink.java | 8 +- .../java/org/apache/impala/planner/Planner.java | 20 +- .../org/apache/impala/planner/TableSink.java | 12 +- .../impala/service/CatalogOpExecutor.java | 31 +- .../org/apache/impala/util/MetaStoreUtil.java | 57 +++ fe/src/main/jflex/sql-scanner.flex | 1 + .../apache/impala/analysis/AnalyzeDDLTest.java | 74 +++- .../impala/analysis/AnalyzeStmtsTest.java | 15 + .../org/apache/impala/analysis/ParserTest.java | 47 +++ .../org/apache/impala/analysis/ToSqlTest.java | 48 ++- .../apache/impala/common/FrontendTestBase.java | 22 +- .../org/apache/impala/planner/PlannerTest.java | 13 + .../queries/PlannerTest/ddl.test | 107 ++++++ .../queries/PlannerTest/insert-sort-by.test | 377 +++++++++++++++++++ .../queries/PlannerTest/insert.test | 8 +- .../queries/QueryTest/alter-table.test | 213 +++++++++++ .../QueryTest/create-table-as-select.test | 10 + .../QueryTest/create-table-like-file.test | 11 + .../QueryTest/create-table-like-table.test | 27 ++ .../queries/QueryTest/create-table.test | 9 + .../queries/QueryTest/show-create-table.test | 24 ++ 39 files changed, 1531 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 295b10b..72c86b2 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -1116,7 +1116,7 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { } // Populate RowGroup::sorting_columns with all columns specified by the Frontend. - for (int col_idx : parent_->sort_by_columns()) { + for (int col_idx : parent_->sort_columns()) { current_row_group_->sorting_columns.push_back(parquet::SortingColumn()); parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back(); sorting_column.column_idx = col_idx; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 16833c1..9da6e57 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -70,7 +70,7 @@ HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs), overwrite_(tsink.table_sink.hdfs_table_sink.overwrite), input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered), - sort_by_columns_(tsink.table_sink .hdfs_table_sink.sort_by_columns), + sort_columns_(tsink.table_sink.hdfs_table_sink.sort_columns), current_clustered_partition_(nullptr) { DCHECK(tsink.__isset.table_sink); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index 4ccd2fe..f6db868 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -152,7 +152,7 @@ class HdfsTableSink : public DataSink { virtual void Close(RuntimeState* state); int skip_header_line_count() const { return skip_header_line_count_; } - const vector<int32_t>& sort_by_columns() const { return sort_by_columns_; } + const vector<int32_t>& sort_columns() const { return sort_columns_; } const HdfsTableDescriptor& TableDesc() { return *table_desc_; } RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; } @@ -278,7 +278,7 @@ class HdfsTableSink : public DataSink { // Stores the indices into the list of non-clustering columns of the target table that // are mentioned in the 'sortby()' hint. This is used in the backend to populate the // RowGroup::sorting_columns list in parquet files. - const std::vector<int32_t>& sort_by_columns_; + const std::vector<int32_t>& sort_columns_; /// Stores the current partition during clustered inserts across subsequent row batches. /// Only set if 'input_is_clustered_' is true. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/common/thrift/DataSinks.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 6e3224e..cce7971 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -74,7 +74,7 @@ struct THdfsTableSink { // Stores the indices into the list of non-clustering columns of the target table that // are mentioned in the 'sortby()' hint. This is used in the backend to populate the // RowGroup::sorting_columns list in parquet files. - 5: optional list<i32> sort_by_columns + 5: optional list<i32> sort_columns } // Structure to encapsulate specific options that are passed down to the KuduTableSink http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/common/thrift/JniCatalog.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift index 96b2e00..507aa2e 100644 --- a/common/thrift/JniCatalog.thrift +++ b/common/thrift/JniCatalog.thrift @@ -378,6 +378,11 @@ struct TCreateTableLikeParams { // Optional storage location for the table 8: optional string location + + // Optional list of sort columns for the new table. If specified, these will override + // any such columns of the source table. If unspecified, the destination table will + // inherit the sort columns of the source table. + 9: optional list<string> sort_columns } // Parameters of CREATE TABLE commands @@ -429,6 +434,9 @@ struct TCreateTableParams { // Primary key column names (Kudu-only) 15: optional list<string> primary_key_column_names; + + // Optional list of sort columns for the new table. + 16: optional list<string> sort_columns } // Parameters of a CREATE VIEW or ALTER VIEW AS SELECT command http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/cup/sql-parser.cup ---------------------------------------------------------------------- diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index 8d0f739..aba8dce 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -242,7 +242,7 @@ parser code {: :}; // List of keywords. Please keep them sorted alphabetically. -// ALL KEYWORDS ALSO NEED TO BE ADDED TO THE ident_or_kw PRODUCTION. +// ALL KEYWORDS ALSO NEED TO BE ADDED TO THE ident_or_keyword PRODUCTION. terminal KW_ADD, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_ANALYTIC, KW_AND, KW_ANTI, KW_API_VERSION, KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BLOCKSIZE, @@ -264,7 +264,7 @@ terminal KW_PURGE, KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME, KW_REPLACE, KW_REPLICATION, KW_RESTRICT, KW_RETURNS, KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, - KW_SERDEPROPERTIES, KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT, KW_STORED, + KW_SERDEPROPERTIES, KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT, KW_SORT, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT, KW_SYMBOL, KW_TABLE, KW_TABLES, KW_TBLPROPERTIES, KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UPDATE, @@ -333,7 +333,7 @@ nonterminal Expr expr, non_pred_expr, arithmetic_expr, timestamp_arithmetic_expr nonterminal ArrayList<Expr> expr_list; nonterminal String alias_clause; nonterminal ArrayList<String> ident_list, primary_keys; -nonterminal ArrayList<String> opt_ident_list; +nonterminal ArrayList<String> opt_ident_list, opt_sort_cols; nonterminal TableName table_name; nonterminal FunctionName function_name; nonterminal Expr where_clause; @@ -979,6 +979,9 @@ alter_tbl_stmt ::= | KW_ALTER KW_TABLE table_name:table opt_partition_set:partitions KW_SET table_property_type:target LPAREN properties_map:properties RPAREN {: RESULT = new AlterTableSetTblProperties(table, partitions, target, properties); :} + | KW_ALTER KW_TABLE table_name:table KW_SORT KW_BY LPAREN opt_ident_list:col_names + RPAREN + {: RESULT = new AlterTableSortByStmt(table, col_names); :} | KW_ALTER KW_TABLE table_name:table opt_partition_set:partition KW_SET KW_COLUMN KW_STATS ident_or_default:col LPAREN properties_map:map RPAREN {: @@ -1123,7 +1126,18 @@ create_tbl_like_stmt ::= opt_comment_val:comment file_format_create_table_val:file_format location_val:location {: - RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), other_table, + RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), null, other_table, + tbl_def.isExternal(), comment, file_format, location, tbl_def.getIfNotExists()); + :} + // This extra production is necessary since without it the parser will not be able to + // parse "CREATE TABLE A LIKE B". + | tbl_def_without_col_defs:tbl_def + opt_sort_cols:sort_cols + KW_LIKE table_name:other_table + opt_comment_val:comment + file_format_create_table_val:file_format location_val:location + {: + RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), sort_cols, other_table, tbl_def.isExternal(), comment, file_format, location, tbl_def.getIfNotExists()); :} ; @@ -1157,17 +1171,25 @@ primary_keys ::= ; tbl_options ::= - opt_comment_val:comment row_format_val:row_format serde_properties:serde_props - file_format_create_table_val:file_format location_val:location cache_op_val:cache_op + opt_sort_cols:sort_cols opt_comment_val:comment row_format_val:row_format + serde_properties:serde_props file_format_create_table_val:file_format + location_val:location cache_op_val:cache_op tbl_properties:tbl_props {: CreateTableStmt.unescapeProperties(serde_props); CreateTableStmt.unescapeProperties(tbl_props); - RESULT = new TableDef.Options(comment, row_format, serde_props, file_format, - location, cache_op, tbl_props); + RESULT = new TableDef.Options(sort_cols, comment, row_format, serde_props, + file_format, location, cache_op, tbl_props); :} ; +opt_sort_cols ::= + KW_SORT KW_BY LPAREN opt_ident_list:col_names RPAREN + {: RESULT = col_names; :} + | /* empty */ + {: RESULT = null; :} + ; + opt_tbl_data_layout ::= partition_column_defs:partition_column_defs {: RESULT = TableDataLayout.createPartitionedLayout(partition_column_defs); :} @@ -3372,6 +3394,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_SMALLINT:r {: RESULT = r.toString(); :} + | KW_SORT:r + {: RESULT = r.toString(); :} | KW_STORED:r {: RESULT = r.toString(); :} | KW_STRAIGHT_JOIN:r http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java index f0e8f11..44024f4 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java @@ -24,7 +24,10 @@ import java.util.Map; import org.apache.avro.SchemaParseException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.HBaseTable; import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Table; import org.apache.impala.common.AnalysisException; import org.apache.impala.service.FeSupport; @@ -37,7 +40,9 @@ import org.apache.impala.util.AvroSchemaUtils; import org.apache.impala.util.MetaStoreUtil; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; /** @@ -103,6 +108,9 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt { // Analyze 'parquet.mr.int96.write.zone' analyzeParquetMrWriteZone(getTargetTable(), tblProperties_); + + // Analyze 'sort.columns' property. + analyzeSortColumns(getTargetTable(), tblProperties_); } /** @@ -182,4 +190,36 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt { } } } + + /** + * Analyzes the 'sort.columns' property in 'tblProperties' against the columns of + * 'table'. The property must store a list of column names separated by commas, and each + * column in the property must occur in 'table' as a non-partitioning column. If there + * are errors during the analysis, this function will throw an AnalysisException. + * Returns a list of positions of the sort columns within the table's list of + * columns. + */ + public static List<Integer> analyzeSortColumns(Table table, + Map<String, String> tblProperties) throws AnalysisException { + if (!tblProperties.containsKey( + AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS)) { + return ImmutableList.of(); + } + + // ALTER TABLE SET is not supported on HBase tables at all, see + // AlterTableSetStmt::analyze(). + Preconditions.checkState(!(table instanceof HBaseTable)); + + if (table instanceof KuduTable) { + throw new AnalysisException(String.format("'%s' table property is not supported " + + "for Kudu tables.", AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS)); + } + + List<String> sortCols = Lists.newArrayList( + Splitter.on(",").trimResults().omitEmptyStrings().split( + tblProperties.get(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS))); + return TableDef.analyzeSortColumns(sortCols, + Column.toColumnNames(table.getNonClusteringColumns()), + Column.toColumnNames(table.getClusteringColumns())); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java new file mode 100644 index 0000000..1bf6658 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.HBaseTable; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.catalog.KuduTable; +import org.apache.impala.catalog.Table; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.thrift.TAlterTableParams; +import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams; +import org.apache.impala.thrift.TAlterTableType; +import org.apache.impala.thrift.TTablePropertyType; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +/** +* Represents an ALTER TABLE SORT BY (c1, c2, ...) statement. +*/ +public class AlterTableSortByStmt extends AlterTableStmt { + // Table property key for sort.columns + public static final String TBL_PROP_SORT_COLUMNS = "sort.columns"; + + private final List<String> columns_; + + public AlterTableSortByStmt(TableName tableName, List<String> columns) { + super(tableName); + Preconditions.checkNotNull(columns); + columns_ = columns; + } + + @Override + public TAlterTableParams toThrift() { + TAlterTableParams params = super.toThrift(); + params.setAlter_type(TAlterTableType.SET_TBL_PROPERTIES); + TAlterTableSetTblPropertiesParams tblPropertyParams = + new TAlterTableSetTblPropertiesParams(); + tblPropertyParams.setTarget(TTablePropertyType.TBL_PROPERTY); + Map<String, String> properties = Maps.newHashMap(); + properties.put(TBL_PROP_SORT_COLUMNS, Joiner.on(",").join(columns_)); + tblPropertyParams.setProperties(properties); + params.setSet_tbl_properties_params(tblPropertyParams); + return params; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + + // Disallow setting sort columns on HBase and Kudu tables. + Table targetTable = getTargetTable(); + if (targetTable instanceof HBaseTable) { + throw new AnalysisException("ALTER TABLE SORT BY not supported on HBase tables."); + } + if (targetTable instanceof KuduTable) { + throw new AnalysisException("ALTER TABLE SORT BY not supported on Kudu tables."); + } + + TableDef.analyzeSortColumns(columns_, targetTable); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/Analyzer.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 9360941..ab3da02 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -274,6 +274,11 @@ public class Analyzer { public final LinkedHashMap<String, Integer> warnings = new LinkedHashMap<String, Integer>(); + // Tracks whether the warnings have been retrieved from this analyzer. If set to true, + // adding new warnings will result in an error. This helps to make sure that no + // warnings are added which will not be displayed. + public boolean warningsRetrieved = false; + public final IdGenerator<EquivalenceClassId> equivClassIdGenerator = EquivalenceClassId.createGenerator(); @@ -448,8 +453,10 @@ public class Analyzer { /** * Returns a list of each warning logged, indicating if it was logged more than once. + * After this function has been called, no warning may be added to the Analyzer anymore. */ public List<String> getWarnings() { + globalState_.warningsRetrieved = true; List<String> result = new ArrayList<String>(); for (Map.Entry<String, Integer> e : globalState_.warnings.entrySet()) { String error = e.getKey(); @@ -2499,9 +2506,11 @@ public class Analyzer { public Map<String, View> getLocalViews() { return localViews_; } /** - * Add a warning that will be displayed to the user. Ignores null messages. + * Add a warning that will be displayed to the user. Ignores null messages. Once + * getWarnings() has been called, no warning may be added to the Analyzer anymore. */ public void addWarning(String msg) { + Preconditions.checkState(!globalState_.warningsRetrieved); if (msg == null) return; Integer count = globalState_.warnings.get(msg); if (count == null) count = 0; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java index 2350398..ac97739 100644 --- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java @@ -55,7 +55,8 @@ public class BaseTableRef extends TableRef { } /** - * Register this table ref and then analyze the Join clause. + * Register this table ref and then analyze any table hints, the Join clause, and the + * 'skip.header.line.count' table property. */ @Override public void analyze(Analyzer analyzer) throws AnalysisException { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java index 432e9c1..a9cdd86 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java @@ -344,11 +344,11 @@ public class CreateTableLikeFileStmt extends CreateTableStmt { schemaLocation_.toString()); String s = ToSqlUtils.getCreateTableSql(getDb(), getTbl() + " __LIKE_FILEFORMAT__ ", getComment(), colsSql, partitionColsSql, - null, null, getTblProperties(), getSerdeProperties(), isExternal(), - getIfNotExists(), getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()), - compression, null, getLocation()); - s = s.replace("__LIKE_FILEFORMAT__", "LIKE " + schemaFileFormat_ + " " + - schemaLocation_.toString()); + null, null, getSortColumns(), getTblProperties(), getSerdeProperties(), + isExternal(), getIfNotExists(), getRowFormat(), + HdfsFileFormat.fromThrift(getFileFormat()), compression, null, getLocation()); + s = s.replace("__LIKE_FILEFORMAT__", String.format("LIKE %s '%s'", + schemaFileFormat_, schemaLocation_.toString())); return s; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java index b0aad63..4091ef4 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java @@ -17,9 +17,13 @@ package org.apache.impala.analysis; +import java.util.List; + +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.impala.analysis.TableDef; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Table; @@ -36,6 +40,7 @@ import org.apache.impala.thrift.TTableName; */ public class CreateTableLikeStmt extends StatementBase { private final TableName tableName_; + private final List<String> sortColumns_; private final TableName srcTableName_; private final boolean isExternal_; private final String comment_; @@ -51,6 +56,7 @@ public class CreateTableLikeStmt extends StatementBase { /** * Builds a CREATE TABLE LIKE statement * @param tableName - Name of the new table + * @param sortColumns - List of columns to sort by during inserts * @param srcTableName - Name of the source table (table to copy) * @param isExternal - If true, the table's data will be preserved if dropped. * @param comment - Comment to attach to the table @@ -58,12 +64,13 @@ public class CreateTableLikeStmt extends StatementBase { * @param location - The HDFS location of where the table data will stored. * @param ifNotExists - If true, no errors are thrown if the table already exists */ - public CreateTableLikeStmt(TableName tableName, TableName srcTableName, - boolean isExternal, String comment, THdfsFileFormat fileFormat, HdfsUri location, - boolean ifNotExists) { + public CreateTableLikeStmt(TableName tableName, List<String> sortColumns, + TableName srcTableName, boolean isExternal, String comment, + THdfsFileFormat fileFormat, HdfsUri location, boolean ifNotExists) { Preconditions.checkNotNull(tableName); Preconditions.checkNotNull(srcTableName); this.tableName_ = tableName; + this.sortColumns_ = sortColumns; this.srcTableName_ = srcTableName; this.isExternal_ = isExternal; this.comment_ = comment; @@ -109,7 +116,11 @@ public class CreateTableLikeStmt extends StatementBase { sb.append("TABLE "); if (ifNotExists_) sb.append("IF NOT EXISTS "); if (tableName_.getDb() != null) sb.append(tableName_.getDb() + "."); - sb.append(tableName_.getTbl() + " LIKE "); + sb.append(tableName_.getTbl() + " "); + if (sortColumns_ != null && !sortColumns_.isEmpty()) { + sb.append("SORT BY (" + Joiner.on(",").join(sortColumns_) + ") "); + } + sb.append("LIKE "); if (srcTableName_.getDb() != null) sb.append(srcTableName_.getDb() + "."); sb.append(srcTableName_.getTbl()); if (comment_ != null) sb.append(" COMMENT '" + comment_ + "'"); @@ -128,6 +139,7 @@ public class CreateTableLikeStmt extends StatementBase { if (fileFormat_ != null) params.setFile_format(fileFormat_); params.setLocation(location_ == null ? null : location_.toString()); params.setIf_not_exists(getIfNotExists()); + params.setSort_columns(sortColumns_); return params; } @@ -163,5 +175,9 @@ public class CreateTableLikeStmt extends StatementBase { if (location_ != null) { location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE); } + + if (sortColumns_ != null) { + TableDef.analyzeSortColumns(sortColumns_, srcTable); + } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java index 21bcb33..8d266ec 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java @@ -94,6 +94,7 @@ public class CreateTableStmt extends StatementBase { public List<KuduPartitionParam> getKuduPartitionParams() { return tableDef_.getKuduPartitionParams(); } + public List<String> getSortColumns() { return tableDef_.getSortColumns(); } public String getComment() { return tableDef_.getComment(); } Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); } private HdfsCachingOp getCachingOp() { return tableDef_.getCachingOp(); } @@ -147,6 +148,7 @@ public class CreateTableStmt extends StatementBase { if (getRowFormat() != null) params.setRow_format(getRowFormat().toThrift()); params.setFile_format(getFileFormat()); params.setIf_not_exists(getIfNotExists()); + params.setSort_columns(getSortColumns()); params.setTable_properties(getTblProperties()); params.setSerde_properties(getSerdeProperties()); for (KuduPartitionParam d: getKuduPartitionParams()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index 83c6e40..ae7149f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -20,6 +20,7 @@ package org.apache.impala.analysis; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.impala.authorization.Privilege; @@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -124,24 +126,30 @@ public class InsertStmt extends StatementBase { private boolean hasShuffleHint_ = false; private boolean hasNoShuffleHint_ = false; - // Indicates whether this insert stmt has a clustered or noclustered hint. If clustering - // is requested, we add a clustering phase before the data sink, so that partitions can - // be written sequentially. The default behavior is to not perform an additional - // clustering step. + // Indicates whether this insert stmt has a clustered or noclustered hint. Only one of + // them may be true, not both. If clustering is requested, we add a clustering phase + // before the data sink, so that partitions can be written sequentially. The default + // behavior is to not perform an additional clustering step. + // TODO: hasClusteredHint_ can be removed once we enable clustering by default + // (IMPALA-5293). private boolean hasClusteredHint_ = false; + private boolean hasNoClusteredHint_ = false; - // For every column of the target table that is referenced in the optional 'sortby()' - // hint, this list will contain the corresponding result expr from 'resultExprs_'. - // Before insertion, all rows will be sorted by these exprs. If the list is empty, no - // additional sorting by non-partitioning columns will be performed. For Hdfs tables, - // the 'sortby()' hint must not contain partition columns. For Kudu tables, it must not - // contain primary key columns. - private List<Expr> sortByExprs_ = Lists.newArrayList(); + // For every column of the target table that is referenced in the optional + // 'sort.columns' table property or in the optional 'sortby()' hint, this list will + // contain the corresponding result expr from 'resultExprs_'. Before insertion, all rows + // will be sorted by these exprs. If the list is empty, no additional sorting by + // non-partitioning columns will be performed. The column list must not contain + // partition columns and must be empty for non-Hdfs tables. + private List<Expr> sortExprs_ = Lists.newArrayList(); // Stores the indices into the list of non-clustering columns of the target table that - // are mentioned in the 'sortby()' hint. This is sent to the backend to populate the - // RowGroup::sorting_columns list in parquet files. - private List<Integer> sortByColumns_ = Lists.newArrayList(); + // are mentioned in the 'sort.columns' table property or the 'sortby()' hint. This is + // sent to the backend to populate the RowGroup::sorting_columns list in parquet files. + // Sort columns supersede the sortby() hint, which we will remove in a subsequent change + // (IMPALA-5144). Until then, it is possible to specify sort columns using both ways at + // the same time and the column lists will be concatenated. + private List<Integer> sortColumns_ = Lists.newArrayList(); // Output expressions that produce the final results to write to the target table. May // include casts. Set in prepareExpressions(). @@ -230,7 +238,8 @@ public class InsertStmt extends StatementBase { hasShuffleHint_ = false; hasNoShuffleHint_ = false; hasClusteredHint_ = false; - sortByExprs_.clear(); + hasNoClusteredHint_ = false; + sortExprs_.clear(); resultExprs_.clear(); mentionedColumns_.clear(); primaryKeyExprs_.clear(); @@ -374,9 +383,19 @@ public class InsertStmt extends StatementBase { // Populate partitionKeyExprs from partitionKeyValues and selectExprTargetColumns prepareExpressions(selectExprTargetColumns, selectListExprs, table_, analyzer); + + // Analyze 'sort.columns' table property and populate sortColumns_ and sortExprs_. + analyzeSortColumns(); + // Analyze plan hints at the end to prefer reporting other error messages first // (e.g., the PARTITION clause is not applicable to unpartitioned and HBase tables). analyzePlanHints(analyzer); + + if (hasNoClusteredHint_ && !sortExprs_.isEmpty()) { + analyzer.addWarning(String.format("Insert statement has 'noclustered' hint, but " + + "table has '%s' property. The 'noclustered' hint will be ignored.", + AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS)); + } } /** @@ -499,6 +518,9 @@ public class InsertStmt extends StatementBase { } } + // Assign sortExprs_ based on sortColumns_. + for (Integer colIdx: sortColumns_) sortExprs_.add(resultExprs_.get(colIdx)); + if (isHBaseTable && overwrite_) { throw new AnalysisException("HBase doesn't have a way to perform INSERT OVERWRITE"); } @@ -763,6 +785,21 @@ public class InsertStmt extends StatementBase { } } + /** + * Analyzes the 'sort.columns' table property if it is set, and populates + * sortColumns_ and sortExprs_. If there are errors during the analysis, this will throw + * an AnalysisException. + */ + private void analyzeSortColumns() throws AnalysisException { + if (!(table_ instanceof HdfsTable)) return; + + sortColumns_ = AlterTableSetTblProperties.analyzeSortColumns(table_, + table_.getMetaStoreTable().getParameters()); + + // Assign sortExprs_ based on sortColumns_. + for (Integer colIdx: sortColumns_) sortExprs_.add(resultExprs_.get(colIdx)); + } + private void analyzePlanHints(Analyzer analyzer) throws AnalysisException { if (planHints_.isEmpty()) return; if (isUpsert_) { @@ -772,7 +809,6 @@ public class InsertStmt extends StatementBase { throw new AnalysisException(String.format("INSERT hints are only supported for " + "inserting into Hdfs tables: %s", getTargetTableName())); } - boolean hasNoClusteredHint = false; for (PlanHint hint: planHints_) { if (hint.is("SHUFFLE")) { hasShuffleHint_ = true; @@ -784,7 +820,7 @@ public class InsertStmt extends StatementBase { hasClusteredHint_ = true; analyzer.setHasPlanHints(); } else if (hint.is("NOCLUSTERED")) { - hasNoClusteredHint = true; + hasNoClusteredHint_ = true; analyzer.setHasPlanHints(); } else if (hint.is("SORTBY")) { analyzeSortByHint(hint); @@ -797,7 +833,7 @@ public class InsertStmt extends StatementBase { if (hasShuffleHint_ && hasNoShuffleHint_) { throw new AnalysisException("Conflicting INSERT hints: shuffle and noshuffle"); } - if (hasClusteredHint_ && hasNoClusteredHint) { + if (hasClusteredHint_ && hasNoClusteredHint_) { throw new AnalysisException("Conflicting INSERT hints: clustered and noclustered"); } } @@ -818,13 +854,18 @@ public class InsertStmt extends StatementBase { } // Find the matching column in the target table's column list (by name) and store - // the corresponding result expr in sortByExprs_. + // the corresponding result expr in sortExprs_. boolean foundColumn = false; List<Column> columns = table_.getNonClusteringColumns(); + if (!columns.isEmpty()) { + // We need to make a copy to make the sortColumns_ list mutable. + // TODO: Remove this when removing the sortby() hint (IMPALA-5157). + sortColumns_ = Lists.newArrayList(sortColumns_); + } for (int i = 0; i < columns.size(); ++i) { if (columns.get(i).getName().equals(columnName)) { - sortByExprs_.add(resultExprs_.get(i)); - sortByColumns_.add(i); + sortExprs_.add(resultExprs_.get(i)); + sortColumns_.add(i); foundColumn = true; break; } @@ -862,8 +903,9 @@ public class InsertStmt extends StatementBase { public boolean hasShuffleHint() { return hasShuffleHint_; } public boolean hasNoShuffleHint() { return hasNoShuffleHint_; } public boolean hasClusteredHint() { return hasClusteredHint_; } + public boolean hasNoClusteredHint() { return hasNoClusteredHint_; } public ArrayList<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; } - public List<Expr> getSortByExprs() { return sortByExprs_; } + public List<Expr> getSortExprs() { return sortExprs_; } public List<String> getMentionedColumns() { List<String> result = Lists.newArrayList(); @@ -877,7 +919,7 @@ public class InsertStmt extends StatementBase { Preconditions.checkState(table_ != null); return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT, partitionKeyExprs_, mentionedColumns_, overwrite_, hasClusteredHint_, - sortByColumns_); + sortColumns_); } /** @@ -889,7 +931,7 @@ public class InsertStmt extends StatementBase { resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true); partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true); primaryKeyExprs_ = Expr.substituteList(primaryKeyExprs_, smap, analyzer, true); - sortByExprs_ = Expr.substituteList(sortByExprs_, smap, analyzer, true); + sortExprs_ = Expr.substituteList(sortExprs_, smap, analyzer, true); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/TableDef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java index cad45b7..5c8a653 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java @@ -17,13 +17,17 @@ package org.apache.impala.analysis; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.impala.authorization.Privilege; +import org.apache.impala.catalog.Column; import org.apache.impala.catalog.HdfsStorageDescriptor; +import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.RowFormat; +import org.apache.impala.catalog.Table; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.thrift.TAccessEvent; @@ -31,7 +35,9 @@ import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.THdfsFileFormat; import org.apache.impala.util.MetaStoreUtil; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -52,9 +58,9 @@ import org.apache.hadoop.fs.permission.FsAction; * - TBLPROPERTIES * - LOCATION * - CACHED IN + * - SORT BY */ class TableDef { - // Name of the new table private final TableName tableName_; @@ -86,6 +92,9 @@ class TableDef { * TABLE statements. */ static class Options { + // Optional list of columns to sort data by when inserting into this table. + final List<String> sortCols; + // Comment to attach to the table final String comment; @@ -107,9 +116,10 @@ class TableDef { // Key/values to persist with table metadata. final Map<String, String> tblProperties; - Options(String comment, RowFormat rowFormat, + Options(List<String> sortCols, String comment, RowFormat rowFormat, Map<String, String> serdeProperties, THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp, Map<String, String> tblProperties) { + this.sortCols = sortCols; this.comment = comment; this.rowFormat = rowFormat; Preconditions.checkNotNull(serdeProperties); @@ -122,8 +132,9 @@ class TableDef { } public Options(String comment) { - this(comment, RowFormat.DEFAULT_ROW_FORMAT, Maps.<String, String>newHashMap(), - THdfsFileFormat.TEXT, null, null, Maps.<String, String>newHashMap()); + this(ImmutableList.<String>of(), comment, RowFormat.DEFAULT_ROW_FORMAT, + Maps.<String, String>newHashMap(), THdfsFileFormat.TEXT, null, null, + Maps.<String, String>newHashMap()); } } @@ -145,9 +156,17 @@ class TableDef { public String getTbl() { return tableName_.getTbl(); } public boolean isAnalyzed() { return isAnalyzed_; } List<ColumnDef> getColumnDefs() { return columnDefs_; } + List<String> getColumnNames() { return ColumnDef.toColumnNames(columnDefs_); } + + List<String> getPartitionColumnNames() { + return ColumnDef.toColumnNames(getPartitionColumnDefs()); + } + List<ColumnDef> getPartitionColumnDefs() { return dataLayout_.getPartitionColumnDefs(); } + + boolean isKuduTable() { return options_.fileFormat == THdfsFileFormat.KUDU; } List<String> getPrimaryKeyColumnNames() { return primaryKeyColNames_; } List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; } boolean isExternal() { return isExternal_; } @@ -159,6 +178,7 @@ class TableDef { Preconditions.checkNotNull(options); options_ = options; } + List<String> getSortColumns() { return options_.sortCols; } String getComment() { return options_.comment; } Map<String, String> getTblProperties() { return options_.tblProperties; } HdfsCachingOp getCachingOp() { return options_.cachingOp; } @@ -201,7 +221,7 @@ class TableDef { if (!colNames.add(colDef.getColName().toLowerCase())) { throw new AnalysisException("Duplicate column name: " + colDef.getColName()); } - if (getFileFormat() != THdfsFileFormat.KUDU && colDef.hasKuduOptions()) { + if (!isKuduTable() && colDef.hasKuduOptions()) { throw new AnalysisException(String.format("Unsupported column options for " + "file format '%s': '%s'", getFileFormat().name(), colDef.toString())); } @@ -259,6 +279,62 @@ class TableDef { } } + /** + * Analyzes the list of columns in 'sortCols' against the columns of 'table'. Each + * column of 'sortCols' must occur in 'table' as a non-partitioning column. 'table' + * must be an HDFS table. If there are errors during the analysis, this will throw an + * AnalysisException. + */ + public static void analyzeSortColumns(List<String> sortCols, Table table) + throws AnalysisException { + Preconditions.checkState(table instanceof HdfsTable); + analyzeSortColumns(sortCols, Column.toColumnNames(table.getNonClusteringColumns()), + Column.toColumnNames(table.getClusteringColumns())); + } + + /** + * Analyzes the list of columns in 'sortCols' and returns their matching positions in + * 'tableCols'. Each column must occur in 'tableCols' and must not occur in + * 'partitionCols'. If there are errors during the analysis, this will throw an + * AnalysisException. + */ + public static List<Integer> analyzeSortColumns(List<String> sortCols, + List<String> tableCols, List<String> partitionCols) + throws AnalysisException { + // The index of each sort column in the list of table columns. + LinkedHashSet<Integer> colIdxs = new LinkedHashSet<Integer>(); + + int numColumns = 0; + for (String sortColName: sortCols) { + ++numColumns; + // Make sure it's not a partition column. + if (partitionCols.contains(sortColName)) { + throw new AnalysisException(String.format("SORT BY column list must not " + + "contain partition column: '%s'", sortColName)); + } + + // Determine the index of each sort column in the list of table columns. + boolean foundColumn = false; + for (int j = 0; j < tableCols.size(); ++j) { + if (tableCols.get(j).equalsIgnoreCase(sortColName)) { + if (colIdxs.contains(j)) { + throw new AnalysisException(String.format("Duplicate column in SORT BY " + + "list: %s", sortColName)); + } + colIdxs.add(j); + foundColumn = true; + break; + } + } + if (!foundColumn) { + throw new AnalysisException(String.format("Could not find SORT BY column '%s' " + + "in table.", sortColName)); + } + } + Preconditions.checkState(numColumns == colIdxs.size()); + return ImmutableList.copyOf(colIdxs); + } + private void analyzeOptions(Analyzer analyzer) throws AnalysisException { MetaStoreUtil.checkShortPropertyMap("Property", options_.tblProperties); MetaStoreUtil.checkShortPropertyMap("Serde property", options_.serdeProperties); @@ -280,11 +356,25 @@ class TableDef { // Analyze 'skip.header.line.format' property. AlterTableSetTblProperties.analyzeSkipHeaderLineCount(options_.tblProperties); analyzeRowFormat(analyzer); + + String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS; + if (options_.tblProperties.containsKey(sortByKey)) { + throw new AnalysisException(String.format("Table definition must not contain the " + + "%s table property. Use SORT BY (...) instead.", sortByKey)); + } + + // Analyze sort columns. + if (options_.sortCols == null) return; + if (isKuduTable()) { + throw new AnalysisException("SORT BY is not supported for Kudu tables."); + } + analyzeSortColumns(options_.sortCols, getColumnNames(), + getPartitionColumnNames()); } private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException { if (options_.rowFormat == null) return; - if (options_.fileFormat == THdfsFileFormat.KUDU) { + if (isKuduTable()) { throw new AnalysisException(String.format( "ROW FORMAT cannot be specified for file format %s.", options_.fileFormat)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java index a922c8b..72bde98 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java +++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java @@ -24,6 +24,7 @@ import java.util.Map; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -54,10 +55,28 @@ import org.apache.impala.util.KuduUtil; */ public class ToSqlUtils { // Table properties to hide when generating the toSql() statement - // EXTERNAL and comment are hidden because they are part of the toSql result, e.g., - // "CREATE EXTERNAL TABLE <name> ... COMMENT <comment> ..." - private static final ImmutableSet<String> HIDDEN_TABLE_PROPERTIES = - ImmutableSet.of("EXTERNAL", "comment"); + // EXTERNAL, SORT BY, and comment are hidden because they are part of the toSql result, + // e.g., "CREATE EXTERNAL TABLE <name> ... SORT BY (...) ... COMMENT <comment> ..." + private static final ImmutableSet<String> HIDDEN_TABLE_PROPERTIES = ImmutableSet.of( + "EXTERNAL", "comment", AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS); + + /** + * Removes all hidden properties from the given 'tblProperties' map. + */ + private static void removeHiddenTableProperties(Map<String, String> tblProperties) { + for (String key: HIDDEN_TABLE_PROPERTIES) tblProperties.remove(key); + } + + /** + * Returns the list of sort columns from 'properties' or 'null' if 'properties' doesn't + * contain 'sort.columns'. + */ + private static List<String> getSortColumns(Map<String, String> properties) { + String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS; + if (!properties.containsKey(sortByKey)) return null; + return Lists.newArrayList(Splitter.on(",").trimResults().omitEmptyStrings().split( + properties.get(sortByKey))); + } /** * Given an unquoted identifier string, returns an identifier lexable by @@ -129,7 +148,7 @@ public class ToSqlUtils { } // TODO: Pass the correct compression, if applicable. return getCreateTableSql(stmt.getDb(), stmt.getTbl(), stmt.getComment(), colsSql, - partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), null, + partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), null, stmt.getSortColumns(), stmt.getTblProperties(), stmt.getSerdeProperties(), stmt.isExternal(), stmt.getIfNotExists(), stmt.getRowFormat(), HdfsFileFormat.fromThrift(stmt.getFileFormat()), HdfsCompression.NONE, null, @@ -148,12 +167,14 @@ public class ToSqlUtils { for (ColumnDef col: innerStmt.getPartitionColumnDefs()) { partitionColsSql.add(col.getColName()); } + HashMap<String, String> properties = Maps.newHashMap(innerStmt.getTblProperties()); + removeHiddenTableProperties(properties); // TODO: Pass the correct compression, if applicable. String createTableSql = getCreateTableSql(innerStmt.getDb(), innerStmt.getTbl(), innerStmt.getComment(), null, partitionColsSql, - innerStmt.getTblPrimaryKeyColumnNames(), null, innerStmt.getTblProperties(), - innerStmt.getSerdeProperties(), innerStmt.isExternal(), - innerStmt.getIfNotExists(), innerStmt.getRowFormat(), + innerStmt.getTblPrimaryKeyColumnNames(), null, innerStmt.getSortColumns(), + properties, innerStmt.getSerdeProperties(), + innerStmt.isExternal(), innerStmt.getIfNotExists(), innerStmt.getRowFormat(), HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null, innerStmt.getLocation()); return createTableSql + " AS " + stmt.getQueryStmt().toSql(); @@ -173,10 +194,9 @@ public class ToSqlUtils { } boolean isExternal = msTable.getTableType() != null && msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString()); + List<String> sortColsSql = getSortColumns(properties); String comment = properties.get("comment"); - for (String hiddenProperty: HIDDEN_TABLE_PROPERTIES) { - properties.remove(hiddenProperty); - } + removeHiddenTableProperties(properties); ArrayList<String> colsSql = Lists.newArrayList(); ArrayList<String> partitionColsSql = Lists.newArrayList(); boolean isHbaseTable = table instanceof HBaseTable; @@ -230,7 +250,7 @@ public class ToSqlUtils { } HdfsUri tableLocation = location == null ? null : new HdfsUri(location); return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql, - partitionColsSql, primaryKeySql, kuduPartitionByParams, properties, + partitionColsSql, primaryKeySql, kuduPartitionByParams, sortColsSql, properties, serdeParameters, isExternal, false, rowFormat, format, compression, storageHandlerClassName, tableLocation); } @@ -243,10 +263,10 @@ public class ToSqlUtils { public static String getCreateTableSql(String dbName, String tableName, String tableComment, List<String> columnsSql, List<String> partitionColumnsSql, List<String> primaryKeysSql, String kuduPartitionByParams, - Map<String, String> tblProperties, Map<String, String> serdeParameters, - boolean isExternal, boolean ifNotExists, RowFormat rowFormat, - HdfsFileFormat fileFormat, HdfsCompression compression, String storageHandlerClass, - HdfsUri location) { + List<String> sortColsSql, Map<String, String> tblProperties, + Map<String, String> serdeParameters, boolean isExternal, boolean ifNotExists, + RowFormat rowFormat, HdfsFileFormat fileFormat, HdfsCompression compression, + String storageHandlerClass, HdfsUri location) { Preconditions.checkNotNull(tableName); StringBuilder sb = new StringBuilder("CREATE "); if (isExternal) sb.append("EXTERNAL "); @@ -254,10 +274,10 @@ public class ToSqlUtils { if (ifNotExists) sb.append("IF NOT EXISTS "); if (dbName != null) sb.append(dbName + "."); sb.append(tableName); - if (columnsSql != null) { + if (columnsSql != null && !columnsSql.isEmpty()) { sb.append(" (\n "); sb.append(Joiner.on(",\n ").join(columnsSql)); - if (!primaryKeysSql.isEmpty()) { + if (primaryKeysSql != null && !primaryKeysSql.isEmpty()) { sb.append(",\n PRIMARY KEY ("); Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")"); } @@ -274,6 +294,11 @@ public class ToSqlUtils { sb.append("PARTITION BY " + kuduPartitionByParams + "\n"); } + if (sortColsSql != null) { + sb.append(String.format("SORT BY (\n %s\n)\n", + Joiner.on(", \n ").join(sortColsSql))); + } + if (tableComment != null) sb.append(" COMMENT '" + tableComment + "'\n"); if (rowFormat != null && !rowFormat.isDefault()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/catalog/Column.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Column.java b/fe/src/main/java/org/apache/impala/catalog/Column.java index b510102..6e67845 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Column.java +++ b/fe/src/main/java/org/apache/impala/catalog/Column.java @@ -129,4 +129,10 @@ public class Column { } }); } + + public static List<String> toColumnNames(List<Column> columns) { + List<String> colNames = Lists.newArrayList(); + for (Column col: columns) colNames.add(col.getName()); + return colNames; + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/catalog/Table.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 881841c..c52e6fa 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -389,13 +389,7 @@ public abstract class Table implements CatalogObject { /** * Returns a list of the column names ordered by position. */ - public List<String> getColumnNames() { - List<String> colNames = Lists.<String>newArrayList(); - for (Column col: colsByPos_) { - colNames.add(col.getName()); - } - return colNames; - } + public List<String> getColumnNames() { return Column.toColumnNames(colsByPos_); } /** * Returns a list of thrift column descriptors ordered by position. @@ -468,6 +462,17 @@ public abstract class Table implements CatalogObject { } public int getNumClusteringCols() { return numClusteringCols_; } + + /** + * Sets the number of clustering columns. This method should only be used for tests and + * the caller must make sure that the value matches any columns that were added to the + * table. + */ + public void setNumClusteringCols(int n) { + Preconditions.checkState(RuntimeEnv.INSTANCE.isTestEnv()); + numClusteringCols_ = n; + } + public long getNumRows() { return numRows_; } public ArrayType getType() { return type_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index 8dc9f62..996388f 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -55,16 +55,16 @@ public class HdfsTableSink extends TableSink { // Stores the indices into the list of non-clustering columns of the target table that // are mentioned in the 'sortby()' hint. This is sent to the backend to populate the // RowGroup::sorting_columns list in parquet files. - private List<Integer> sortByColumns_ = Lists.newArrayList(); + private List<Integer> sortColumns_ = Lists.newArrayList(); public HdfsTableSink(Table targetTable, List<Expr> partitionKeyExprs, - boolean overwrite, boolean inputIsClustered, List<Integer> sortByColumns) { + boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns) { super(targetTable, Op.INSERT); Preconditions.checkState(targetTable instanceof HdfsTable); partitionKeyExprs_ = partitionKeyExprs; overwrite_ = overwrite; inputIsClustered_ = inputIsClustered; - sortByColumns_ = sortByColumns; + sortColumns_ = sortColumns; } @Override @@ -170,7 +170,7 @@ public class HdfsTableSink extends TableSink { if (skipHeaderLineCount > 0) { hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount); } - hdfsTableSink.setSort_by_columns(sortByColumns_); + hdfsTableSink.setSort_columns(sortColumns_); TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID, TTableSinkType.HDFS, sinkOp_.toThrift()); tTableSink.hdfs_table_sink = hdfsTableSink; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 418e0c6..d6b0814 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -485,12 +485,13 @@ public class Planner { /** * Insert a sort node on top of the plan, depending on the clustered/noclustered/sortby - * plan hint. If clustering is enabled in insertStmt, then the ordering columns will - * start with the clustering columns (key columns for Kudu tables), so that partitions - * can be written sequentially in the table sink. Any additional non-clustering columns - * specified by the sortby hint will be added to the ordering columns and after any - * clustering columns. If neither clustering nor a sortby hint are specified, then no - * sort node will be added to the plan. + * plan hint. If clustering is enabled in insertStmt or additional columns are specified + * in the 'sort.columns' table property, then the ordering columns will start with the + * clustering columns (key columns for Kudu tables), so that partitions can be written + * sequentially in the table sink. Any additional non-clustering columns specified by + * the 'sort.columns' property will be added to the ordering columns and after any + * clustering columns. If no clustering is requested and the table does not contain + * columns in the 'sort.columns' property, then no sort node will be added to the plan. */ public void createPreInsertSort(InsertStmt insertStmt, PlanFragment inputFragment, Analyzer analyzer) throws ImpalaException { @@ -505,10 +506,13 @@ public class Planner { orderingExprs.add(inputFragment.getDataPartition().getPartitionExprs().get(0)); orderingExprs.addAll(insertStmt.getPrimaryKeyExprs()); } - } else if (insertStmt.hasClusteredHint()) { + } else if (insertStmt.hasClusteredHint() || !insertStmt.getSortExprs().isEmpty()) { + // NOTE: If the table has a 'sort.columns' property and the query has a + // 'noclustered' hint, we issue a warning during analysis and ignore the + // 'noclustered' hint. orderingExprs.addAll(insertStmt.getPartitionKeyExprs()); } - orderingExprs.addAll(insertStmt.getSortByExprs()); + orderingExprs.addAll(insertStmt.getSortExprs()); // Ignore constants for the sake of clustering. Expr.removeConstants(orderingExprs); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/planner/TableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java index 13e79c3..1c27360 100644 --- a/fe/src/main/java/org/apache/impala/planner/TableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java @@ -87,22 +87,22 @@ public abstract class TableSink extends DataSink { * Not all Ops are supported for all tables. * All parameters must be non-null, the lists in particular need to be empty if they * don't make sense for a certain table type. - * For HDFS tables 'sortByColumns' specifies the indices into the list of non-clustering + * For HDFS tables 'sortColumns' specifies the indices into the list of non-clustering * columns of the target table that are mentioned in the 'sortby()' hint. */ public static TableSink create(Table table, Op sinkAction, List<Expr> partitionKeyExprs, List<Integer> referencedColumns, - boolean overwrite, boolean inputIsClustered, List<Integer> sortByColumns) { + boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns) { Preconditions.checkNotNull(partitionKeyExprs); Preconditions.checkNotNull(referencedColumns); - Preconditions.checkNotNull(sortByColumns); + Preconditions.checkNotNull(sortColumns); if (table instanceof HdfsTable) { // Hdfs only supports inserts. Preconditions.checkState(sinkAction == Op.INSERT); // Referenced columns don't make sense for an Hdfs table. Preconditions.checkState(referencedColumns.isEmpty()); return new HdfsTableSink(table, partitionKeyExprs, overwrite, inputIsClustered, - sortByColumns); + sortColumns); } else if (table instanceof HBaseTable) { // HBase only supports inserts. Preconditions.checkState(sinkAction == Op.INSERT); @@ -113,14 +113,14 @@ public abstract class TableSink extends DataSink { // Referenced columns don't make sense for an HBase table. Preconditions.checkState(referencedColumns.isEmpty()); // sortby() hint is not supported for HBase tables. - Preconditions.checkState(sortByColumns.isEmpty()); + Preconditions.checkState(sortColumns.isEmpty()); // Create the HBaseTableSink and return it. return new HBaseTableSink(table); } else if (table instanceof KuduTable) { // Kudu doesn't have a way to perform INSERT OVERWRITE. Preconditions.checkState(overwrite == false); // sortby() hint is not supported for Kudu tables. - Preconditions.checkState(sortByColumns.isEmpty()); + Preconditions.checkState(sortColumns.isEmpty()); return new KuduTableSink(table, sinkAction, referencedColumns); } else { throw new UnsupportedOperationException( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 9dc247d..3f7365f 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.impala.analysis.AlterTableSortByStmt; import org.apache.impala.analysis.FunctionName; import org.apache.impala.analysis.TableName; import org.apache.impala.authorization.User; @@ -154,6 +155,7 @@ import org.apache.impala.thrift.TTruncateParams; import org.apache.impala.thrift.TUpdateCatalogRequest; import org.apache.impala.thrift.TUpdateCatalogResponse; import org.apache.impala.util.HdfsCachingUtil; +import org.apache.impala.util.MetaStoreUtil; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -1598,7 +1600,10 @@ public class CatalogOpExecutor { } else { tbl.setParameters(new HashMap<String, String>()); } - + if (params.isSetSort_columns() && !params.sort_columns.isEmpty()) { + tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS, + Joiner.on(",").join(params.sort_columns)); + } if (params.getComment() != null) { tbl.getParameters().put("comment", params.getComment()); } @@ -1789,6 +1794,10 @@ public class CatalogOpExecutor { if (tbl.getParameters() == null) { tbl.setParameters(new HashMap<String, String>()); } + if (params.isSetSort_columns() && !params.sort_columns.isEmpty()) { + tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS, + Joiner.on(",").join(params.sort_columns)); + } if (comment != null) { tbl.getParameters().put("comment", comment); } @@ -1867,6 +1876,13 @@ public class CatalogOpExecutor { List<FieldSchema> newColumns = buildFieldSchemaList(columns); if (replaceExistingCols) { msTbl.getSd().setCols(newColumns); + String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS; + if (msTbl.getParameters().containsKey(sortByKey)) { + String oldColumns = msTbl.getParameters().get(sortByKey); + String alteredColumns = MetaStoreUtil.intersectCsvListWithColumNames(oldColumns, + columns); + msTbl.getParameters().put(sortByKey, alteredColumns); + } } else { // Append the new column to the existing list of columns. for (FieldSchema fs: buildFieldSchemaList(columns)) { @@ -1896,6 +1912,13 @@ public class CatalogOpExecutor { if (newCol.getComment() != null) { fs.setComment(newCol.getComment()); } + String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS; + if (msTbl.getParameters().containsKey(sortByKey)) { + String oldColumns = msTbl.getParameters().get(sortByKey); + String alteredColumns = MetaStoreUtil.replaceValueInCsvList(oldColumns, colName, + newCol.getColumnName()); + msTbl.getParameters().put(sortByKey, alteredColumns); + } break; } if (!iterator.hasNext()) { @@ -2174,6 +2197,12 @@ public class CatalogOpExecutor { "Column name %s not found in table %s.", colName, tbl.getFullName())); } } + String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS; + if (msTbl.getParameters().containsKey(sortByKey)) { + String oldColumns = msTbl.getParameters().get(sortByKey); + String alteredColumns = MetaStoreUtil.removeValueFromCsvList(oldColumns, colName); + msTbl.getParameters().put(sortByKey, alteredColumns); + } applyAlterTable(msTbl); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java index 07435ae..7671a45 100644 --- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java +++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java @@ -17,6 +17,7 @@ package org.apache.impala.util; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -30,8 +31,14 @@ import org.apache.impala.compat.MetastoreShim; import org.apache.log4j.Logger; import org.apache.thrift.TException; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.thrift.TColumn; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * Utility methods for interacting with the Hive Metastore. @@ -169,4 +176,54 @@ public class MetaStoreUtil { } } } + + /** + * Returns a copy of the comma-separated list of values 'inputCsv', with all occurences + * of 'toReplace' replaced with value 'replaceWith'. + */ + public static String replaceValueInCsvList(String input, String toReplace, + String replaceWith) { + Iterable<String> inputList = + Splitter.on(",").trimResults().omitEmptyStrings().split(input); + List<String> outputList = Lists.newArrayList(); + for (String elem : inputList) { + if (elem.equalsIgnoreCase(toReplace)) { + outputList.add(replaceWith); + } else { + outputList.add(elem); + } + } + return Joiner.on(",").join(outputList); + } + + /** + * Returns a copy of the comma-separated list of values 'inputCsv', with all occurences + * of 'toRemove' removed. + */ + public static String removeValueFromCsvList(String inputCsv, String toRemove) { + Iterable<String> inputList = + Splitter.on(",").trimResults().omitEmptyStrings().split(inputCsv); + List<String> outputList = Lists.newArrayList(); + for (String elem : inputList) { + if (!elem.equalsIgnoreCase(toRemove)) outputList.add(elem); + } + return Joiner.on(",").join(outputList); + } + + /** + * Returns the intersection of the comma-separated list of values 'leftCsv' with the + * names of the columns in 'rightCols'. The return value is a comma-separated list. + */ + public static String intersectCsvListWithColumNames(String leftCsv, + List<TColumn> rightCols) { + Iterable<String> leftCols = + Splitter.on(",").trimResults().omitEmptyStrings().split(leftCsv); + HashSet<String> rightColNames = Sets.newHashSet(); + for (TColumn c : rightCols) rightColNames.add(c.getColumnName().toLowerCase()); + List<String> outputList = Lists.newArrayList(); + for (String leftCol : leftCols) { + if (rightColNames.contains(leftCol.toLowerCase())) outputList.add(leftCol); + } + return Joiner.on(",").join(outputList); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/main/jflex/sql-scanner.flex ---------------------------------------------------------------------- diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index 6d1a773..fb01eaa 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -207,6 +207,7 @@ import org.apache.impala.analysis.SqlParserSymbols; keywordMap.put("set", new Integer(SqlParserSymbols.KW_SET)); keywordMap.put("show", new Integer(SqlParserSymbols.KW_SHOW)); keywordMap.put("smallint", new Integer(SqlParserSymbols.KW_SMALLINT)); + keywordMap.put("sort", new Integer(SqlParserSymbols.KW_SORT)); keywordMap.put("stats", new Integer(SqlParserSymbols.KW_STATS)); keywordMap.put("stored", new Integer(SqlParserSymbols.KW_STORED)); keywordMap.put("straight_join", new Integer(SqlParserSymbols.KW_STRAIGHT_JOIN)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 2380609..48b15c7 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -506,6 +506,19 @@ public class AnalyzeDDLTest extends FrontendTestBase { AnalyzesOk("alter table functional.alltypes PARTITION (year<=2010, month=11) " + "set serdeproperties ('a'='2')"); + AnalyzesOk("alter table functional.alltypes set tblproperties('sort.columns'='id')"); + AnalyzesOk("alter table functional.alltypes set tblproperties(" + + "'sort.columns'='INT_COL,id')"); + AnalyzesOk("alter table functional.alltypes set tblproperties(" + + "'sort.columns'='bool_col,int_col,id')"); + AnalyzesOk("alter table functional.alltypes set tblproperties('sort.columns'='')"); + AnalysisError("alter table functional.alltypes set tblproperties(" + + "'sort.columns'='id,int_col,id')", + "Duplicate column in SORT BY list: id"); + AnalysisError("alter table functional.alltypes set tblproperties(" + + "'sort.columns'='ID, foo')", + "Could not find SORT BY column 'foo' in table."); + { // Check that long_properties fail at the analysis layer String long_property_key = ""; @@ -1076,6 +1089,20 @@ public class AnalyzeDDLTest extends FrontendTestBase { } @Test + public void TestAlterTableSortBy() { + AnalyzesOk("alter table functional.alltypes sort by (id)"); + AnalyzesOk("alter table functional.alltypes sort by (int_col,id)"); + AnalyzesOk("alter table functional.alltypes sort by (bool_col,int_col,id)"); + AnalyzesOk("alter table functional.alltypes sort by ()"); + AnalysisError("alter table functional.alltypes sort by (id,int_col,id)", + "Duplicate column in SORT BY list: id"); + AnalysisError("alter table functional.alltypes sort by (id, foo)", "Could not find " + + "SORT BY column 'foo' in table."); + AnalysisError("alter table functional_hbase.alltypes sort by (id, foo)", + "ALTER TABLE SORT BY not supported on HBase tables."); + } + + @Test public void TestAlterView() { // View-definition references a table. AnalyzesOk("alter view functional.alltypes_view as " + @@ -1419,9 +1446,12 @@ public class AnalyzeDDLTest extends FrontendTestBase { AnalyzesOk("create table default.newtbl_DNE like parquet " + "'/test-warehouse/schemas/zipcode_incomes.parquet'"); AnalyzesOk("create table newtbl_DNE like parquet " - + "'/test-warehouse/schemas/zipcode_incomes.parquet' STORED AS PARQUET"); + + "'/test-warehouse/schemas/zipcode_incomes.parquet' stored as parquet"); AnalyzesOk("create external table newtbl_DNE like parquet " - + "'/test-warehouse/schemas/zipcode_incomes.parquet' STORED AS PARQUET"); + + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by (id,zip) " + + "stored as parquet"); + AnalyzesOk("create table newtbl_DNE like parquet " + + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by (id,zip)"); AnalyzesOk("create table if not exists functional.zipcode_incomes like parquet " + "'/test-warehouse/schemas/zipcode_incomes.parquet'"); AnalyzesOk("create table if not exists newtbl_DNE like parquet " @@ -1658,6 +1688,11 @@ public class AnalyzeDDLTest extends FrontendTestBase { "CREATE TABLE LIKE is not supported for Kudu tables"); AnalysisError("create table tbl like functional_kudu.dimtbl", "Cloning a Kudu " + "table using CREATE TABLE LIKE is not supported."); + + // Test sort columns. + AnalyzesOk("create table tbl sort by (int_col,id) like functional.alltypes"); + AnalysisError("create table tbl sort by (int_col,foo) like functional.alltypes", + "Could not find SORT BY column 'foo' in table."); } @Test @@ -1906,6 +1941,27 @@ public class AnalyzeDDLTest extends FrontendTestBase { "Tables produced by an external data source do not support the column type: " + type.name()); } + + // Tables with sort columns + AnalyzesOk("create table functional.new_table (i int, j int) sort by (i)"); + AnalyzesOk("create table functional.new_table (i int, j int) sort by (i, j)"); + AnalyzesOk("create table functional.new_table (i int, j int) sort by (j, i)"); + + // 'sort.columns' property not supported in table definition. + AnalysisError("create table Foo (i int) sort by (i) " + + "tblproperties ('sort.columns'='i')", "Table definition must not contain the " + + "sort.columns table property. Use SORT BY (...) instead."); + + // Column in sortby hint must exist. + AnalysisError("create table functional.new_table (i int) sort by (j)", "Could not " + + "find SORT BY column 'j' in table."); + + // Partitioned HDFS table + AnalyzesOk("create table functional.new_table (i int) PARTITIONED BY (d decimal)" + + "SORT BY (i)"); + // Column in sortby hint must not be a Hdfs partition column. + AnalysisError("create table functional.new_table (i int) PARTITIONED BY (d decimal)" + + "SORT BY (d)", "SORT BY column list must not contain partition column: 'd'"); } @Test @@ -1988,6 +2044,15 @@ public class AnalyzeDDLTest extends FrontendTestBase { // ALTER TABLE RENAME TO AnalyzesOk("ALTER TABLE functional_kudu.testtbl RENAME TO new_testtbl"); + + // ALTER TABLE SORT BY + AnalysisError("alter table functional_kudu.alltypes sort by (int_col)", + "ALTER TABLE SORT BY not supported on Kudu tables."); + + // ALTER TABLE SET TBLPROPERTIES for sort.columns + AnalysisError("alter table functional_kudu.alltypes set tblproperties(" + + "'sort.columns'='int_col')", + "'sort.columns' table property is not supported for Kudu tables."); } @Test @@ -2296,6 +2361,11 @@ public class AnalyzeDDLTest extends FrontendTestBase { AnalysisError("create table tab (i int primary key block_size 'val') " + "partition by hash (i) partitions 3 stored as kudu", "Invalid value " + "for BLOCK_SIZE: 'val'. A positive INTEGER value is expected."); + + // Sort columns are not supported for Kudu tables. + AnalysisError("create table tab (i int, x int primary key) partition by hash(x) " + + "partitions 8 sort by(i) stored as kudu", "SORT BY is not supported for Kudu " + + "tables."); } @Test http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index 88297fe..e4a688c 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -1713,6 +1713,12 @@ public class AnalyzeStmtsTest extends AnalyzerTest { @Test public void TestInsertHints() throws AnalysisException { + // Test table to make sure that conflicting hints and table properties result in a + // warning. + addTestDb("test_sort_by", "Test DB for SORT BY clause."); + addTestTable("create table test_sort_by.alltypes (id int, int_col int, " + + "bool_col boolean) partitioned by (year int, month int) " + + "sort by (int_col, bool_col) location '/'"); for (String[] hintStyle: getHintStyles()) { String prefix = hintStyle[0]; String suffix = hintStyle[1]; @@ -1766,6 +1772,15 @@ public class AnalyzeStmtsTest extends AnalyzerTest { "/* +clustered,noclustered */ select * from functional.alltypes", prefix, suffix), "Conflicting INSERT hints: clustered and noclustered"); + // noclustered hint on a table with sort.columns issues a warning. + AnalyzesOk(String.format( + "insert into test_sort_by.alltypes partition (year, month) " + + "%snoclustered%s select id, int_col, bool_col, year, month from " + + "functional.alltypes", prefix, suffix), + "Insert statement has 'noclustered' hint, but table has 'sort.columns' " + + "property. The 'noclustered' hint will be ignored."); + + // Below are tests for hints that are not supported by the legacy syntax. if (prefix == "[") continue; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ada9dac/fe/src/test/java/org/apache/impala/analysis/ParserTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index 44a543d..d35b9b3 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -2266,6 +2266,13 @@ public class ParserTest extends FrontendTestBase { } @Test + public void TestAlterTableSortBy() { + ParsesOk("ALTER TABLE TEST SORT BY (int_col, id)"); + ParsesOk("ALTER TABLE TEST SORT BY ()"); + ParserError("ALTER TABLE TEST PARTITION (year=2009, month=4) SORT BY (int_col, id)"); + } + + @Test public void TestAlterTableOrViewRename() { for (String entity: Lists.newArrayList("TABLE", "VIEW")) { ParsesOk(String.format("ALTER %s TestDb.Foo RENAME TO TestDb.Foo2", entity)); @@ -2309,6 +2316,7 @@ public class ParserTest extends FrontendTestBase { ParsesOk("CREATE TABLE Foo2 LIKE Foo COMMENT 'tbl' " + "STORED AS PARQUETFILE LOCATION '/a/b'"); ParsesOk("CREATE TABLE Foo2 LIKE Foo STORED AS TEXTFILE LOCATION '/a/b'"); + ParsesOk("CREATE TABLE Foo LIKE PARQUET '/user/foo'"); // Table and column names starting with digits. ParsesOk("CREATE TABLE 01_Foo (01_i int, 02_j string)"); @@ -2336,6 +2344,45 @@ public class ParserTest extends FrontendTestBase { ParserError("CREATE TABLE Foo (i int) PARTITIONED BY ()"); ParserError("CREATE TABLE Foo (i int) PARTITIONED BY"); + // Sort by clause + ParsesOk("CREATE TABLE Foo (i int, j int) SORT BY ()"); + ParsesOk("CREATE TABLE Foo (i int) SORT BY (i)"); + ParsesOk("CREATE TABLE Foo (i int) SORT BY (j)"); + ParsesOk("CREATE TABLE Foo (i int, j int) SORT BY (i,j)"); + ParsesOk("CREATE EXTERNAL TABLE Foo (i int, s string) SORT BY (s) " + + "LOCATION '/test-warehouse/'"); + ParsesOk("CREATE TABLE Foo (i int, s string) SORT BY (s) COMMENT 'hello' " + + "LOCATION '/a/b/' TBLPROPERTIES ('123'='1234')"); + + // SORT BY must be the first table option + ParserError("CREATE TABLE Foo (i int, s string) COMMENT 'hello' SORT BY (s) " + + "LOCATION '/a/b/' TBLPROPERTIES ('123'='1234')"); + ParserError("CREATE TABLE Foo (i int, s string) COMMENT 'hello' LOCATION '/a/b/' " + + "SORT BY (s) TBLPROPERTIES ('123'='1234')"); + ParserError("CREATE TABLE Foo (i int, s string) COMMENT 'hello' LOCATION '/a/b/' " + + "TBLPROPERTIES ('123'='1234') SORT BY (s)"); + + // Malformed SORT BY clauses + ParserError("CREATE TABLE Foo (i int, j int) SORT BY"); + ParserError("CREATE TABLE Foo (i int, j int) SORT BY (i,)"); + ParserError("CREATE TABLE Foo (i int, j int) SORT BY (int)"); + + // Create table like other table with sort columns + ParsesOk("CREATE TABLE Foo SORT BY(bar) LIKE Baz STORED AS TEXTFILE LOCATION '/a/b'"); + ParserError("CREATE TABLE SORT BY(bar) Foo LIKE Baz STORED AS TEXTFILE " + + "LOCATION '/a/b'"); + // SORT BY must be the first table option + ParserError("CREATE TABLE Foo LIKE Baz STORED AS TEXTFILE LOCATION '/a/b' " + + "SORT BY(bar)"); + + // CTAS with sort columns + ParsesOk("CREATE TABLE Foo SORT BY(bar) AS SELECT * FROM BAR"); + ParserError("CREATE TABLE Foo AS SELECT * FROM BAR SORT BY(bar)"); + + // Create table like file with sort columns + ParsesOk("CREATE TABLE Foo LIKE PARQUET '/user/foo' SORT BY (id)"); + ParserError("CREATE TABLE Foo SORT BY (id) LIKE PARQUET '/user/foo'"); + // Column comments ParsesOk("CREATE TABLE Foo (i int COMMENT 'hello', s string)"); ParsesOk("CREATE TABLE Foo (i int COMMENT 'hello', s string COMMENT 'hi')");
