IMPALA-4561: Replace DISTRIBUTE BY with PARTITION BY in CREATE TABLE Change-Id: I0e07c41eabb4c8cb95754cf04293cbd9e03d6ab2 Reviewed-on: http://gerrit.cloudera.org:8080/5317 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cba93f1a Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cba93f1a Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cba93f1a Branch: refs/heads/master Commit: cba93f1ac3d4c0219c6266924493ad19c8c10556 Parents: f837754 Author: Dimitris Tsirogiannis <[email protected]> Authored: Thu Dec 1 20:43:43 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Tue Dec 6 10:41:53 2016 +0000 ---------------------------------------------------------------------- common/thrift/CatalogObjects.thrift | 20 +- common/thrift/JniCatalog.thrift | 4 +- fe/src/main/cup/sql-parser.cup | 56 ++--- .../AlterTableAddDropRangePartitionStmt.java | 4 +- .../analysis/CreateTableAsSelectStmt.java | 3 +- .../apache/impala/analysis/CreateTableStmt.java | 38 ++-- .../apache/impala/analysis/DistributeParam.java | 211 ------------------- .../impala/analysis/KuduPartitionParam.java | 211 +++++++++++++++++++ .../apache/impala/analysis/RangePartition.java | 18 +- .../apache/impala/analysis/TableDataLayout.java | 21 +- .../org/apache/impala/analysis/TableDef.java | 6 +- .../org/apache/impala/analysis/ToSqlUtils.java | 14 +- .../org/apache/impala/catalog/KuduTable.java | 81 +++---- .../impala/service/KuduCatalogOpExecutor.java | 36 ++-- .../apache/impala/analysis/AnalyzeDDLTest.java | 164 +++++++------- .../impala/analysis/AuthorizationTest.java | 2 +- .../org/apache/impala/analysis/ParserTest.java | 58 ++--- testdata/bin/generate-schema-statements.py | 2 +- .../functional/functional_schema_template.sql | 28 +-- testdata/datasets/tpcds/tpcds_kudu_template.sql | 48 ++--- testdata/datasets/tpch/tpch_kudu_template.sql | 16 +- testdata/datasets/tpch/tpch_schema_template.sql | 16 +- .../queries/PlannerTest/lineage.test | 4 +- .../queries/QueryTest/kudu-scan-node.test | 6 +- .../QueryTest/kudu-timeouts-catalogd.test | 2 +- .../queries/QueryTest/kudu_alter.test | 6 +- .../queries/QueryTest/kudu_create.test | 24 +-- .../queries/QueryTest/kudu_delete.test | 6 +- .../queries/QueryTest/kudu_describe.test | 2 +- .../queries/QueryTest/kudu_insert.test | 10 +- .../queries/QueryTest/kudu_partition_ddl.test | 28 +-- .../queries/QueryTest/kudu_stats.test | 2 +- .../queries/QueryTest/kudu_update.test | 2 +- .../queries/QueryTest/kudu_upsert.test | 4 +- tests/query_test/test_cancellation.py | 2 +- tests/query_test/test_kudu.py | 38 ++-- tests/shell/test_shell_commandline.py | 2 +- 37 files changed, 598 insertions(+), 597 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/common/thrift/CatalogObjects.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 10cb777..de89b51 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -357,8 +357,8 @@ struct TDataSourceTable { 2: required string init_string } -// Parameters needed for hash distribution -struct TDistributeByHashParam { +// Parameters needed for hash partitioning +struct TKuduPartitionByHashParam { 1: required list<string> columns 2: required i32 num_buckets } @@ -370,16 +370,16 @@ struct TRangePartition { 4: optional bool is_upper_bound_inclusive } -// A range distribution is identified by a list of columns and a list of range partitions. -struct TDistributeByRangeParam { +// A range partitioning is identified by a list of columns and a list of range partitions. +struct TKuduPartitionByRangeParam { 1: required list<string> columns 2: optional list<TRangePartition> range_partitions } -// Parameters for the DISTRIBUTE BY clause. -struct TDistributeParam { - 1: optional TDistributeByHashParam by_hash_param; - 2: optional TDistributeByRangeParam by_range_param; +// Parameters for the PARTITION BY clause. +struct TKuduPartitionParam { + 1: optional TKuduPartitionByHashParam by_hash_param; + 2: optional TKuduPartitionByRangeParam by_range_param; } // Represents a Kudu table @@ -392,8 +392,8 @@ struct TKuduTable { // Name of the key columns 3: required list<string> key_columns - // Distribution schemes - 4: required list<TDistributeParam> distribute_by + // Partitioning + 4: required list<TKuduPartitionParam> partition_by } // Represents a table or view. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/common/thrift/JniCatalog.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift index 8bb07e6..d224658 100644 --- a/common/thrift/JniCatalog.thrift +++ b/common/thrift/JniCatalog.thrift @@ -415,9 +415,9 @@ struct TCreateTableParams { // If set, the table will be cached after creation with details specified in cache_op. 13: optional THdfsCachingOp cache_op - // If set, the table is automatically distributed according to this parameter. + // If set, the table is automatically partitioned according to this parameter. // Kudu-only. - 14: optional list<CatalogObjects.TDistributeParam> distribute_by + 14: optional list<CatalogObjects.TKuduPartitionParam> partition_by // Primary key column names (Kudu-only) 15: optional list<string> primary_key_column_names; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/cup/sql-parser.cup ---------------------------------------------------------------------- diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index a375c75..e09993b 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -401,21 +401,21 @@ nonterminal CreateTableAsSelectStmt create_tbl_as_select_stmt; nonterminal CreateTableLikeStmt create_tbl_like_stmt; nonterminal CreateTableStmt create_tbl_stmt; nonterminal TableDef tbl_def_without_col_defs, tbl_def_with_col_defs; -nonterminal TableDataLayout opt_tbl_data_layout, distributed_data_layout; +nonterminal TableDataLayout opt_tbl_data_layout, partitioned_data_layout; nonterminal TableDef.Options tbl_options; nonterminal CreateViewStmt create_view_stmt; nonterminal CreateDataSrcStmt create_data_src_stmt; nonterminal DropDataSrcStmt drop_data_src_stmt; nonterminal ShowDataSrcsStmt show_data_srcs_stmt; nonterminal StructField struct_field_def; -nonterminal DistributeParam distribute_hash_param; +nonterminal KuduPartitionParam hash_partition_param; nonterminal List<RangePartition> range_params_list; nonterminal RangePartition range_param; nonterminal Pair<Expr, Boolean> opt_lower_range_val, opt_upper_range_val; -nonterminal ArrayList<DistributeParam> distribute_hash_param_list; -nonterminal ArrayList<DistributeParam> distribute_param_list; -nonterminal DistributeParam distribute_range_param; +nonterminal ArrayList<KuduPartitionParam> hash_partition_param_list; +nonterminal ArrayList<KuduPartitionParam> partition_param_list; +nonterminal KuduPartitionParam range_partition_param; nonterminal ColumnDef column_def, view_column_def; nonterminal ArrayList<ColumnDef> column_def_list, partition_column_defs, view_column_def_list, view_column_defs; @@ -1013,12 +1013,12 @@ create_tbl_as_select_stmt ::= // An optional clause cannot be used directly below because it would conflict with // the first rule in "create_tbl_stmt". primary_keys:primary_keys - distributed_data_layout:distribute_params + partitioned_data_layout:partition_params tbl_options:options KW_AS query_stmt:select_stmt {: tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys); - tbl_def.getDistributeParams().addAll(distribute_params.getDistributeParams()); + tbl_def.getKuduPartitionParams().addAll(partition_params.getKuduPartitionParams()); tbl_def.setOptions(options); RESULT = new CreateTableAsSelectStmt(new CreateTableStmt(tbl_def), select_stmt, null); :} @@ -1057,7 +1057,7 @@ create_tbl_stmt ::= tbl_options:options {: tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs()); - tbl_def.getDistributeParams().addAll(data_layout.getDistributeParams()); + tbl_def.getKuduPartitionParams().addAll(data_layout.getKuduPartitionParams()); tbl_def.setOptions(options); RESULT = new CreateTableStmt(tbl_def); :} @@ -1082,7 +1082,7 @@ create_tbl_stmt ::= tbl_options:options {: tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs()); - tbl_def.getDistributeParams().addAll(data_layout.getDistributeParams()); + tbl_def.getKuduPartitionParams().addAll(data_layout.getKuduPartitionParams()); tbl_def.setOptions(options); RESULT = new CreateTableLikeFileStmt(new CreateTableStmt(tbl_def), schema_file_format, new HdfsUri(schema_location)); @@ -1148,13 +1148,13 @@ tbl_options ::= opt_tbl_data_layout ::= partition_column_defs:partition_column_defs {: RESULT = TableDataLayout.createPartitionedLayout(partition_column_defs); :} - | distributed_data_layout:data_layout + | partitioned_data_layout:data_layout {: RESULT = data_layout; :} ; -distributed_data_layout ::= - distribute_param_list:distribute_params - {: RESULT = TableDataLayout.createDistributedLayout(distribute_params); :} +partitioned_data_layout ::= + partition_param_list:partition_params + {: RESULT = TableDataLayout.createKuduPartitionedLayout(partition_params); :} | /* empty */ {: RESULT = TableDataLayout.createEmptyLayout(); :} ; @@ -1164,25 +1164,25 @@ partition_column_defs ::= {: RESULT = col_defs; :} ; -// The DISTRIBUTE clause contains any number of HASH() clauses followed by exactly zero +// The PARTITION BY clause contains any number of HASH() clauses followed by exactly zero // or one RANGE clauses -distribute_param_list ::= - KW_DISTRIBUTE KW_BY distribute_hash_param_list:list +partition_param_list ::= + KW_PARTITION KW_BY hash_partition_param_list:list {: RESULT = list; :} - | KW_DISTRIBUTE KW_BY distribute_range_param:rng + | KW_PARTITION KW_BY range_partition_param:rng {: RESULT = Lists.newArrayList(rng); :} - | KW_DISTRIBUTE KW_BY distribute_hash_param_list:list COMMA distribute_range_param:rng + | KW_PARTITION KW_BY hash_partition_param_list:list COMMA range_partition_param:rng {: list.add(rng); RESULT = list; :} ; -// A list of HASH distribution clauses used for flexible partitioning -distribute_hash_param_list ::= - distribute_hash_param:dc +// A list of HASH partitioning clauses used for flexible partitioning +hash_partition_param_list ::= + hash_partition_param:dc {: RESULT = Lists.newArrayList(dc); :} - | distribute_hash_param_list:list COMMA distribute_hash_param:d + | hash_partition_param_list:list COMMA hash_partition_param:d {: list.add(d); RESULT = list; @@ -1190,26 +1190,26 @@ distribute_hash_param_list ::= ; // The column list for a HASH clause is optional. -distribute_hash_param ::= +hash_partition_param ::= KW_HASH LPAREN ident_list:cols RPAREN KW_INTO INTEGER_LITERAL:buckets KW_BUCKETS - {: RESULT = DistributeParam.createHashParam(cols, buckets.intValue()); :} + {: RESULT = KuduPartitionParam.createHashParam(cols, buckets.intValue()); :} | KW_HASH KW_INTO INTEGER_LITERAL:buckets KW_BUCKETS {: - RESULT = DistributeParam.createHashParam(Lists.<String>newArrayList(), + RESULT = KuduPartitionParam.createHashParam(Lists.<String>newArrayList(), buckets.intValue()); :} ; // The column list for a RANGE clause is optional. -distribute_range_param ::= +range_partition_param ::= KW_RANGE LPAREN ident_list:cols RPAREN LPAREN range_params_list:ranges RPAREN {: - RESULT = DistributeParam.createRangeParam(cols, ranges); + RESULT = KuduPartitionParam.createRangeParam(cols, ranges); :} | KW_RANGE LPAREN range_params_list:ranges RPAREN {: - RESULT = DistributeParam.createRangeParam(Collections.<String>emptyList(), ranges); + RESULT = KuduPartitionParam.createRangeParam(Collections.<String>emptyList(), ranges); :} ; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java index b1618e0..aee07f7 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java @@ -93,10 +93,10 @@ public class AlterTableAddDropRangePartitionStmt extends AlterTableStmt { "partitions: RANGE %s", table.getFullName(), rangePartitionSpec_.toSql())); } KuduTable kuduTable = (KuduTable) table; - List<String> colNames = kuduTable.getRangeDistributionColNames(); + List<String> colNames = kuduTable.getRangePartitioningColNames(); if (colNames.isEmpty()) { throw new AnalysisException(String.format("Cannot add/drop partition %s: " + - "Kudu table %s doesn't have a range-based distribution.", + "Kudu table %s doesn't have a range-based partitioning.", rangePartitionSpec_.toSql(), kuduTable.getName())); } List<ColumnDef> rangeColDefs = Lists.newArrayListWithCapacity(colNames.size()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java index cd7b1c8..eb492c6 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java @@ -193,7 +193,8 @@ public class CreateTableAsSelectStmt extends StatementBase { Table tmpTable = null; if (KuduTable.isKuduTable(msTbl)) { tmpTable = KuduTable.createCtasTarget(db, msTbl, createStmt_.getColumnDefs(), - createStmt_.getTblPrimaryKeyColumnNames(), createStmt_.getDistributeParams()); + createStmt_.getTblPrimaryKeyColumnNames(), + createStmt_.getKuduPartitionParams()); } else { // TODO: Creating a tmp table using load() is confusing. // Refactor it to use a 'createCtasTarget()' function similar to Kudu table. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java index f2db81b..1139005 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java @@ -88,8 +88,8 @@ public class CreateTableStmt extends StatementBase { public List<ColumnDef> getPartitionColumnDefs() { return tableDef_.getPartitionColumnDefs(); } - public List<DistributeParam> getDistributeParams() { - return tableDef_.getDistributeParams(); + public List<KuduPartitionParam> getKuduPartitionParams() { + return tableDef_.getKuduPartitionParams(); } public String getComment() { return tableDef_.getComment(); } Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); } @@ -146,8 +146,8 @@ public class CreateTableStmt extends StatementBase { params.setIf_not_exists(getIfNotExists()); params.setTable_properties(getTblProperties()); params.setSerde_properties(getSerdeProperties()); - for (DistributeParam d: getDistributeParams()) { - params.addToDistribute_by(d.toThrift()); + for (KuduPartitionParam d: getKuduPartitionParams()) { + params.addToPartition_by(d.toThrift()); } for (ColumnDef pkColDef: getPrimaryKeyColumnDefs()) { params.addToPrimary_key_column_names(pkColDef.getColName()); @@ -188,8 +188,8 @@ public class CreateTableStmt extends StatementBase { getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) { throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE); } - AnalysisUtils.throwIfNotEmpty(getDistributeParams(), - "Only Kudu tables can use the DISTRIBUTE BY clause."); + AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(), + "Only Kudu tables can use the PARTITION BY clause."); if (hasPrimaryKey()) { throw new AnalysisException("Only Kudu tables can specify a PRIMARY KEY."); } @@ -263,8 +263,8 @@ public class CreateTableStmt extends StatementBase { KuduTable.KEY_TABLET_REPLICAS)); AnalysisUtils.throwIfNotEmpty(getColumnDefs(), "Columns cannot be specified with an external Kudu table."); - AnalysisUtils.throwIfNotEmpty(getDistributeParams(), - "DISTRIBUTE BY cannot be used with an external Kudu table."); + AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(), + "PARTITION BY cannot be used with an external Kudu table."); } /** @@ -305,29 +305,29 @@ public class CreateTableStmt extends StatementBase { } } - if (!getDistributeParams().isEmpty()) { - analyzeDistributeParams(analyzer); + if (!getKuduPartitionParams().isEmpty()) { + analyzeKuduPartitionParams(analyzer); } else { - throw new AnalysisException("Table distribution must be specified for " + + throw new AnalysisException("Table partitioning must be specified for " + "managed Kudu tables."); } } /** - * Analyzes the distribution schemes specified in the CREATE TABLE statement. + * Analyzes the partitioning schemes specified in the CREATE TABLE statement. */ - private void analyzeDistributeParams(Analyzer analyzer) throws AnalysisException { + private void analyzeKuduPartitionParams(Analyzer analyzer) throws AnalysisException { Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU); Map<String, ColumnDef> pkColDefsByName = ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs()); - for (DistributeParam distributeParam: getDistributeParams()) { - // If no column names were specified in this distribution scheme, use all the + for (KuduPartitionParam partitionParam: getKuduPartitionParams()) { + // If no column names were specified in this partitioning scheme, use all the // primary key columns. - if (!distributeParam.hasColumnNames()) { - distributeParam.setColumnNames(pkColDefsByName.keySet()); + if (!partitionParam.hasColumnNames()) { + partitionParam.setColumnNames(pkColDefsByName.keySet()); } - distributeParam.setPkColumnDefMap(pkColDefsByName); - distributeParam.analyze(analyzer); + partitionParam.setPkColumnDefMap(pkColDefsByName); + partitionParam.analyze(analyzer); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java deleted file mode 100644 index 0eb1329..0000000 --- a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java +++ /dev/null @@ -1,211 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.analysis; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.impala.common.AnalysisException; -import org.apache.impala.thrift.TDistributeByHashParam; -import org.apache.impala.thrift.TDistributeByRangeParam; -import org.apache.impala.thrift.TDistributeParam; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -/** - * Represents the distribution of a Kudu table as defined in the DISTRIBUTE BY - * clause of a CREATE TABLE statement. The distribution can be hash-based or - * range-based or both. See RangePartition for details on the supported range partitions. - * - * Examples: - * - Hash-based: - * DISTRIBUTE BY HASH(id) INTO 10 BUCKETS - * - Single column range-based: - * DISTRIBUTE BY RANGE(age) - * ( - * PARTITION VALUES < 10, - * PARTITION 10 <= VALUES < 20, - * PARTITION 20 <= VALUES < 30, - * PARTITION VALUE = 100 - * ) - * - Combination of hash and range based: - * DISTRIBUTE BY HASH (id) INTO 3 BUCKETS, - * RANGE (age) - * ( - * PARTITION 10 <= VALUES < 20, - * PARTITION VALUE = 100 - * ) - * - Multi-column range based: - * DISTRIBUTE BY RANGE (year, quarter) - * ( - * PARTITION VALUE = (2001, 1), - * PARTITION VALUE = (2001, 2), - * PARTITION VALUE = (2002, 1) - * ) - * - */ -public class DistributeParam implements ParseNode { - - /** - * Creates a hash-based DistributeParam. - */ - public static DistributeParam createHashParam(List<String> cols, int buckets) { - return new DistributeParam(Type.HASH, cols, buckets, null); - } - - /** - * Creates a range-based DistributeParam. - */ - public static DistributeParam createRangeParam(List<String> cols, - List<RangePartition> rangePartitions) { - return new DistributeParam(Type.RANGE, cols, NO_BUCKETS, rangePartitions); - } - - private static final int NO_BUCKETS = -1; - - /** - * The distribution type. - */ - public enum Type { - HASH, RANGE - } - - // Columns of this distribution. If no columns are specified, all - // the primary key columns of the associated table are used. - private final List<String> colNames_ = Lists.newArrayList(); - - // Map of primary key column names to the associated column definitions. Must be set - // before the call to analyze(). - private Map<String, ColumnDef> pkColumnDefByName_; - - // Distribution scheme type - private final Type type_; - - // Only relevant for hash-based distribution, -1 otherwise - private final int numBuckets_; - - // List of range partitions specified in a range-based distribution. - private List<RangePartition> rangePartitions_; - - private DistributeParam(Type t, List<String> colNames, int buckets, - List<RangePartition> partitions) { - type_ = t; - for (String name: colNames) colNames_.add(name.toLowerCase()); - rangePartitions_ = partitions; - numBuckets_ = buckets; - } - - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - Preconditions.checkState(!colNames_.isEmpty()); - Preconditions.checkNotNull(pkColumnDefByName_); - Preconditions.checkState(!pkColumnDefByName_.isEmpty()); - // Validate that the columns specified in this distribution are primary key columns. - for (String colName: colNames_) { - if (!pkColumnDefByName_.containsKey(colName)) { - throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " + - "column. Only key columns can be used in DISTRIBUTE BY.", colName, toSql())); - } - } - if (type_ == Type.RANGE) analyzeRangeParam(analyzer); - } - - /** - * Analyzes a range-based distribution. This function does not check for overlapping - * range partitions; these checks are performed by Kudu and an error is reported back - * to the user. - */ - public void analyzeRangeParam(Analyzer analyzer) throws AnalysisException { - List<ColumnDef> pkColDefs = Lists.newArrayListWithCapacity(colNames_.size()); - for (String colName: colNames_) pkColDefs.add(pkColumnDefByName_.get(colName)); - for (RangePartition rangePartition: rangePartitions_) { - rangePartition.analyze(analyzer, pkColDefs); - } - } - - @Override - public String toSql() { - StringBuilder builder = new StringBuilder(type_.toString()); - if (!colNames_.isEmpty()) { - builder.append(" ("); - Joiner.on(", ").appendTo(builder, colNames_).append(")"); - } - if (type_ == Type.HASH) { - builder.append(" INTO "); - Preconditions.checkState(numBuckets_ != NO_BUCKETS); - builder.append(numBuckets_).append(" BUCKETS"); - } else { - builder.append(" ("); - if (rangePartitions_ != null) { - List<String> partsSql = Lists.newArrayList(); - for (RangePartition rangePartition: rangePartitions_) { - partsSql.add(rangePartition.toSql()); - } - builder.append(Joiner.on(", ").join(partsSql)); - } else { - builder.append("..."); - } - builder.append(")"); - } - return builder.toString(); - } - - @Override - public String toString() { return toSql(); } - - public TDistributeParam toThrift() { - TDistributeParam result = new TDistributeParam(); - // TODO: Add a validate() function to ensure the validity of distribute params. - if (type_ == Type.HASH) { - TDistributeByHashParam hash = new TDistributeByHashParam(); - Preconditions.checkState(numBuckets_ != NO_BUCKETS); - hash.setNum_buckets(numBuckets_); - hash.setColumns(colNames_); - result.setBy_hash_param(hash); - } else { - Preconditions.checkState(type_ == Type.RANGE); - TDistributeByRangeParam rangeParam = new TDistributeByRangeParam(); - rangeParam.setColumns(colNames_); - if (rangePartitions_ == null) { - result.setBy_range_param(rangeParam); - return result; - } - for (RangePartition rangePartition: rangePartitions_) { - rangeParam.addToRange_partitions(rangePartition.toThrift()); - } - result.setBy_range_param(rangeParam); - } - return result; - } - - void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) { - pkColumnDefByName_ = pkColumnDefByName; - } - - boolean hasColumnNames() { return !colNames_.isEmpty(); } - public List<String> getColumnNames() { return ImmutableList.copyOf(colNames_); } - void setColumnNames(Collection<String> colNames) { - Preconditions.checkState(colNames_.isEmpty()); - colNames_.addAll(colNames); - } - - public Type getType() { return type_; } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java new file mode 100644 index 0000000..3f69fae --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.impala.common.AnalysisException; +import org.apache.impala.thrift.TKuduPartitionByHashParam; +import org.apache.impala.thrift.TKuduPartitionByRangeParam; +import org.apache.impala.thrift.TKuduPartitionParam; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Represents the partitioning of a Kudu table as defined in the PARTITION BY + * clause of a CREATE TABLE statement. The partitioning can be hash-based or + * range-based or both. See RangePartition for details on the supported range partitions. + * + * Examples: + * - Hash-based: + * PARTITION BY HASH(id) INTO 10 BUCKETS + * - Single column range-based: + * PARTITION BY RANGE(age) + * ( + * PARTITION VALUES < 10, + * PARTITION 10 <= VALUES < 20, + * PARTITION 20 <= VALUES < 30, + * PARTITION VALUE = 100 + * ) + * - Combination of hash and range based: + * PARTITION BY HASH (id) INTO 3 BUCKETS, + * RANGE (age) + * ( + * PARTITION 10 <= VALUES < 20, + * PARTITION VALUE = 100 + * ) + * - Multi-column range based: + * PARTITION BY RANGE (year, quarter) + * ( + * PARTITION VALUE = (2001, 1), + * PARTITION VALUE = (2001, 2), + * PARTITION VALUE = (2002, 1) + * ) + * + */ +public class KuduPartitionParam implements ParseNode { + + /** + * Creates a hash-based KuduPartitionParam. + */ + public static KuduPartitionParam createHashParam(List<String> cols, int buckets) { + return new KuduPartitionParam(Type.HASH, cols, buckets, null); + } + + /** + * Creates a range-based KuduPartitionParam. + */ + public static KuduPartitionParam createRangeParam(List<String> cols, + List<RangePartition> rangePartitions) { + return new KuduPartitionParam(Type.RANGE, cols, NO_BUCKETS, rangePartitions); + } + + private static final int NO_BUCKETS = -1; + + /** + * The partitioning type. + */ + public enum Type { + HASH, RANGE + } + + // Columns of this partitioning. If no columns are specified, all + // the primary key columns of the associated table are used. + private final List<String> colNames_ = Lists.newArrayList(); + + // Map of primary key column names to the associated column definitions. Must be set + // before the call to analyze(). + private Map<String, ColumnDef> pkColumnDefByName_; + + // partitioning scheme type + private final Type type_; + + // Only relevant for hash-based partitioning, -1 otherwise + private final int numBuckets_; + + // List of range partitions specified in a range-based partitioning. + private List<RangePartition> rangePartitions_; + + private KuduPartitionParam(Type t, List<String> colNames, int buckets, + List<RangePartition> partitions) { + type_ = t; + for (String name: colNames) colNames_.add(name.toLowerCase()); + rangePartitions_ = partitions; + numBuckets_ = buckets; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + Preconditions.checkState(!colNames_.isEmpty()); + Preconditions.checkNotNull(pkColumnDefByName_); + Preconditions.checkState(!pkColumnDefByName_.isEmpty()); + // Validate that the columns specified in this partitioning are primary key columns. + for (String colName: colNames_) { + if (!pkColumnDefByName_.containsKey(colName)) { + throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " + + "column. Only key columns can be used in PARTITION BY.", colName, toSql())); + } + } + if (type_ == Type.RANGE) analyzeRangeParam(analyzer); + } + + /** + * Analyzes a range-based partitioning. This function does not check for overlapping + * range partitions; these checks are performed by Kudu and an error is reported back + * to the user. + */ + public void analyzeRangeParam(Analyzer analyzer) throws AnalysisException { + List<ColumnDef> pkColDefs = Lists.newArrayListWithCapacity(colNames_.size()); + for (String colName: colNames_) pkColDefs.add(pkColumnDefByName_.get(colName)); + for (RangePartition rangePartition: rangePartitions_) { + rangePartition.analyze(analyzer, pkColDefs); + } + } + + @Override + public String toSql() { + StringBuilder builder = new StringBuilder(type_.toString()); + if (!colNames_.isEmpty()) { + builder.append(" ("); + Joiner.on(", ").appendTo(builder, colNames_).append(")"); + } + if (type_ == Type.HASH) { + builder.append(" INTO "); + Preconditions.checkState(numBuckets_ != NO_BUCKETS); + builder.append(numBuckets_).append(" BUCKETS"); + } else { + builder.append(" ("); + if (rangePartitions_ != null) { + List<String> partsSql = Lists.newArrayList(); + for (RangePartition rangePartition: rangePartitions_) { + partsSql.add(rangePartition.toSql()); + } + builder.append(Joiner.on(", ").join(partsSql)); + } else { + builder.append("..."); + } + builder.append(")"); + } + return builder.toString(); + } + + @Override + public String toString() { return toSql(); } + + public TKuduPartitionParam toThrift() { + TKuduPartitionParam result = new TKuduPartitionParam(); + // TODO: Add a validate() function to ensure the validity of distribute params. + if (type_ == Type.HASH) { + TKuduPartitionByHashParam hash = new TKuduPartitionByHashParam(); + Preconditions.checkState(numBuckets_ != NO_BUCKETS); + hash.setNum_buckets(numBuckets_); + hash.setColumns(colNames_); + result.setBy_hash_param(hash); + } else { + Preconditions.checkState(type_ == Type.RANGE); + TKuduPartitionByRangeParam rangeParam = new TKuduPartitionByRangeParam(); + rangeParam.setColumns(colNames_); + if (rangePartitions_ == null) { + result.setBy_range_param(rangeParam); + return result; + } + for (RangePartition rangePartition: rangePartitions_) { + rangeParam.addToRange_partitions(rangePartition.toThrift()); + } + result.setBy_range_param(rangeParam); + } + return result; + } + + void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) { + pkColumnDefByName_ = pkColumnDefByName; + } + + boolean hasColumnNames() { return !colNames_.isEmpty(); } + public List<String> getColumnNames() { return ImmutableList.copyOf(colNames_); } + void setColumnNames(Collection<String> colNames) { + Preconditions.checkState(colNames_.isEmpty()); + colNames_.addAll(colNames); + } + + public Type getType() { return type_; } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java index 3e41bc4..e2640a8 100644 --- a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java +++ b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java @@ -110,26 +110,26 @@ public class RangePartition implements ParseNode { throw new IllegalStateException("Not implemented"); } - public void analyze(Analyzer analyzer, List<ColumnDef> distributionColDefs) + public void analyze(Analyzer analyzer, List<ColumnDef> partColDefs) throws AnalysisException { - analyzeBoundaryValues(lowerBound_, distributionColDefs, analyzer); + analyzeBoundaryValues(lowerBound_, partColDefs, analyzer); if (!isSingletonRange_) { - analyzeBoundaryValues(upperBound_, distributionColDefs, analyzer); + analyzeBoundaryValues(upperBound_, partColDefs, analyzer); } } private void analyzeBoundaryValues(List<Expr> boundaryValues, - List<ColumnDef> distributionColDefs, Analyzer analyzer) throws AnalysisException { + List<ColumnDef> partColDefs, Analyzer analyzer) throws AnalysisException { if (!boundaryValues.isEmpty() - && boundaryValues.size() != distributionColDefs.size()) { + && boundaryValues.size() != partColDefs.size()) { throw new AnalysisException(String.format("Number of specified range " + - "partition values is different than the number of distribution " + + "partition values is different than the number of partitioning " + "columns: (%d vs %d). Range partition: '%s'", boundaryValues.size(), - distributionColDefs.size(), toSql())); + partColDefs.size(), toSql())); } for (int i = 0; i < boundaryValues.size(); ++i) { LiteralExpr literal = analyzeBoundaryValue(boundaryValues.get(i), - distributionColDefs.get(i), analyzer); + partColDefs.get(i), analyzer); Preconditions.checkNotNull(literal); boundaryValues.set(i, literal); } @@ -162,7 +162,7 @@ public class RangePartition implements ParseNode { if (!org.apache.impala.catalog.Type.isImplicitlyCastable(literalType, colType, true)) { throw new AnalysisException(String.format("Range partition value %s " + - "(type: %s) is not type compatible with distribution column '%s' (type: %s).", + "(type: %s) is not type compatible with partitioning column '%s' (type: %s).", literal.toSql(), literalType, pkColumn.getColName(), colType.toSql())); } if (!literalType.equals(colType)) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java index 4d3ed80..aef5732 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java @@ -22,35 +22,34 @@ import com.google.common.collect.Lists; import java.util.List; /** - * Represents the PARTITION BY and DISTRIBUTED BY clauses of a DDL statement. - * TODO: Reconsider this class when we add support for new range partitioning syntax (see - * IMPALA-3724). + * Represents the PARTITION BY and PARTITIONED BY clauses of a DDL statement. */ class TableDataLayout { private final List<ColumnDef> partitionColDefs_; - private final List<DistributeParam> distributeParams_; + private final List<KuduPartitionParam> kuduPartitionParams_; private TableDataLayout(List<ColumnDef> partitionColumnDefs, - List<DistributeParam> distributeParams) { + List<KuduPartitionParam> partitionParams) { partitionColDefs_ = partitionColumnDefs; - distributeParams_ = distributeParams; + kuduPartitionParams_ = partitionParams; } static TableDataLayout createPartitionedLayout(List<ColumnDef> partitionColumnDefs) { return new TableDataLayout(partitionColumnDefs, - Lists.<DistributeParam>newArrayList()); + Lists.<KuduPartitionParam>newArrayList()); } - static TableDataLayout createDistributedLayout(List<DistributeParam> distributeParams) { - return new TableDataLayout(Lists.<ColumnDef>newArrayList(), distributeParams); + static TableDataLayout createKuduPartitionedLayout( + List<KuduPartitionParam> partitionParams) { + return new TableDataLayout(Lists.<ColumnDef>newArrayList(), partitionParams); } static TableDataLayout createEmptyLayout() { return new TableDataLayout(Lists.<ColumnDef>newArrayList(), - Lists.<DistributeParam>newArrayList()); + Lists.<KuduPartitionParam>newArrayList()); } List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; } - List<DistributeParam> getDistributeParams() { return distributeParams_; } + List<KuduPartitionParam> getKuduPartitionParams() { return kuduPartitionParams_; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/TableDef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java index 1c16954..a40b4d2 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java @@ -74,7 +74,7 @@ class TableDef { // If true, no errors are thrown if the table already exists. private final boolean ifNotExists_; - // Partitioned/distribute by parameters. + // Partitioning parameters. private final TableDataLayout dataLayout_; // True if analyze() has been called. @@ -152,8 +152,8 @@ class TableDef { List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; } boolean isExternal() { return isExternal_; } boolean getIfNotExists() { return ifNotExists_; } - List<DistributeParam> getDistributeParams() { - return dataLayout_.getDistributeParams(); + List<KuduPartitionParam> getKuduPartitionParams() { + return dataLayout_.getKuduPartitionParams(); } void setOptions(Options options) { Preconditions.checkNotNull(options); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java index 4cd095c..be4ab6f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java +++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java @@ -197,7 +197,7 @@ public class ToSqlUtils { String storageHandlerClassName = table.getStorageHandlerClassName(); List<String> primaryKeySql = Lists.newArrayList(); - String kuduDistributeByParams = null; + String kuduPartitionByParams = null; if (table instanceof KuduTable) { KuduTable kuduTable = (KuduTable) table; // Kudu tables don't use LOCATION syntax @@ -219,10 +219,10 @@ public class ToSqlUtils { primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames()); List<String> paramsSql = Lists.newArrayList(); - for (DistributeParam param: kuduTable.getDistributeBy()) { + for (KuduPartitionParam param: kuduTable.getPartitionBy()) { paramsSql.add(param.toSql()); } - kuduDistributeByParams = Joiner.on(", ").join(paramsSql); + kuduPartitionByParams = Joiner.on(", ").join(paramsSql); } else { // We shouldn't output the columns for external tables colsSql = null; @@ -230,7 +230,7 @@ public class ToSqlUtils { } HdfsUri tableLocation = location == null ? null : new HdfsUri(location); return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql, - partitionColsSql, primaryKeySql, kuduDistributeByParams, properties, + partitionColsSql, primaryKeySql, kuduPartitionByParams, properties, serdeParameters, isExternal, false, rowFormat, format, compression, storageHandlerClassName, tableLocation); } @@ -242,7 +242,7 @@ public class ToSqlUtils { */ public static String getCreateTableSql(String dbName, String tableName, String tableComment, List<String> columnsSql, List<String> partitionColumnsSql, - List<String> primaryKeysSql, String kuduDistributeByParams, + List<String> primaryKeysSql, String kuduPartitionByParams, Map<String, String> tblProperties, Map<String, String> serdeParameters, boolean isExternal, boolean ifNotExists, RowFormat rowFormat, HdfsFileFormat fileFormat, HdfsCompression compression, String storageHandlerClass, @@ -271,8 +271,8 @@ public class ToSqlUtils { Joiner.on(", \n ").join(partitionColumnsSql))); } - if (kuduDistributeByParams != null) { - sb.append("DISTRIBUTE BY " + kuduDistributeByParams + "\n"); + if (kuduPartitionByParams != null) { + sb.append("PARTITION BY " + kuduPartitionByParams + "\n"); } if (rowFormat != null && !rowFormat.isDefault()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index a7f72c3..9bbcbd5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -29,15 +29,15 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.impala.analysis.ColumnDef; -import org.apache.impala.analysis.DistributeParam; +import org.apache.impala.analysis.KuduPartitionParam; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.service.BackendConfig; import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TColumn; -import org.apache.impala.thrift.TDistributeByHashParam; -import org.apache.impala.thrift.TDistributeByRangeParam; -import org.apache.impala.thrift.TDistributeParam; +import org.apache.impala.thrift.TKuduPartitionByHashParam; +import org.apache.impala.thrift.TKuduPartitionByRangeParam; +import org.apache.impala.thrift.TKuduPartitionParam; import org.apache.impala.thrift.TKuduTable; import org.apache.impala.thrift.TResultSet; import org.apache.impala.thrift.TResultSetMetadata; @@ -110,9 +110,9 @@ public class KuduTable extends Table { // Primary key column names. private final List<String> primaryKeyColumnNames_ = Lists.newArrayList(); - // Distribution schemes of this Kudu table. Both range and hash-based distributions are + // Partitioning schemes of this Kudu table. Both range and hash-based partitioning are // supported. - private final List<DistributeParam> distributeBy_ = Lists.newArrayList(); + private final List<KuduPartitionParam> partitionBy_ = Lists.newArrayList(); // Schema of the underlying Kudu table. private org.apache.kudu.Schema kuduSchema_; @@ -148,36 +148,36 @@ public class KuduTable extends Table { return ImmutableList.copyOf(primaryKeyColumnNames_); } - public List<DistributeParam> getDistributeBy() { - return ImmutableList.copyOf(distributeBy_); + public List<KuduPartitionParam> getPartitionBy() { + return ImmutableList.copyOf(partitionBy_); } /** - * Returns the range-based distribution of this table if it exists, null otherwise. + * Returns the range-based partitioning of this table if it exists, null otherwise. */ - private DistributeParam getRangeDistribution() { - for (DistributeParam distributeParam: distributeBy_) { - if (distributeParam.getType() == DistributeParam.Type.RANGE) { - return distributeParam; + private KuduPartitionParam getRangePartitioning() { + for (KuduPartitionParam partitionParam: partitionBy_) { + if (partitionParam.getType() == KuduPartitionParam.Type.RANGE) { + return partitionParam; } } return null; } /** - * Returns the column names of the table's range-based distribution or an empty - * list if the table doesn't have a range-based distribution. + * Returns the column names of the table's range-based partitioning or an empty + * list if the table doesn't have a range-based partitioning. */ - public List<String> getRangeDistributionColNames() { - DistributeParam rangeDistribution = getRangeDistribution(); - if (rangeDistribution == null) return Collections.<String>emptyList(); - return rangeDistribution.getColumnNames(); + public List<String> getRangePartitioningColNames() { + KuduPartitionParam rangePartitioning = getRangePartitioning(); + if (rangePartitioning == null) return Collections.<String>emptyList(); + return rangePartitioning.getColumnNames(); } /** * Loads the metadata of a Kudu table. * - * Schema and distribution schemes are loaded directly from Kudu whereas column stats + * Schema and partitioning schemes are loaded directly from Kudu whereas column stats * are loaded from HMS. The function also updates the table schema in HMS in order to * propagate alterations made to the Kudu table to HMS. */ @@ -209,7 +209,7 @@ public class KuduTable extends Table { // Load metadata from Kudu and HMS try { loadSchema(kuduTable); - loadDistributeByParams(kuduTable); + loadPartitionByParams(kuduTable); loadAllColumnStats(msClient); } catch (ImpalaRuntimeException e) { throw new TableLoadingException("Error loading metadata for Kudu table " + @@ -255,19 +255,19 @@ public class KuduTable extends Table { } } - private void loadDistributeByParams(org.apache.kudu.client.KuduTable kuduTable) { + private void loadPartitionByParams(org.apache.kudu.client.KuduTable kuduTable) { Preconditions.checkNotNull(kuduTable); Schema tableSchema = kuduTable.getSchema(); PartitionSchema partitionSchema = kuduTable.getPartitionSchema(); Preconditions.checkState(!colsByPos_.isEmpty()); - distributeBy_.clear(); + partitionBy_.clear(); for (HashBucketSchema hashBucketSchema: partitionSchema.getHashBucketSchemas()) { List<String> columnNames = Lists.newArrayList(); for (int colId: hashBucketSchema.getColumnIds()) { columnNames.add(getColumnNameById(tableSchema, colId)); } - distributeBy_.add( - DistributeParam.createHashParam(columnNames, hashBucketSchema.getNumBuckets())); + partitionBy_.add(KuduPartitionParam.createHashParam(columnNames, + hashBucketSchema.getNumBuckets())); } RangeSchema rangeSchema = partitionSchema.getRangeSchema(); List<Integer> columnIds = rangeSchema.getColumns(); @@ -277,7 +277,7 @@ public class KuduTable extends Table { // We don't populate the split values because Kudu's API doesn't currently support // retrieving the split values for range partitions. // TODO: File a Kudu JIRA. - distributeBy_.add(DistributeParam.createRangeParam(columnNames, null)); + partitionBy_.add(KuduPartitionParam.createRangeParam(columnNames, null)); } /** @@ -297,14 +297,14 @@ public class KuduTable extends Table { */ public static KuduTable createCtasTarget(Db db, org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs, - List<String> primaryKeyColumnNames, List<DistributeParam> distributeParams) { + List<String> primaryKeyColumnNames, List<KuduPartitionParam> partitionParams) { KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); int pos = 0; for (ColumnDef colDef: columnDefs) { tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++)); } tmpTable.primaryKeyColumnNames_.addAll(primaryKeyColumnNames); - tmpTable.distributeBy_.addAll(distributeParams); + tmpTable.partitionBy_.addAll(partitionParams); return tmpTable; } @@ -324,27 +324,28 @@ public class KuduTable extends Table { kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses()); primaryKeyColumnNames_.clear(); primaryKeyColumnNames_.addAll(tkudu.getKey_columns()); - loadDistributeByParamsFromThrift(tkudu.getDistribute_by()); + loadPartitionByParamsFromThrift(tkudu.getPartition_by()); } - private void loadDistributeByParamsFromThrift(List<TDistributeParam> params) { - distributeBy_.clear(); - for (TDistributeParam param: params) { + private void loadPartitionByParamsFromThrift(List<TKuduPartitionParam> params) { + partitionBy_.clear(); + for (TKuduPartitionParam param: params) { if (param.isSetBy_hash_param()) { - TDistributeByHashParam hashParam = param.getBy_hash_param(); - distributeBy_.add(DistributeParam.createHashParam( + TKuduPartitionByHashParam hashParam = param.getBy_hash_param(); + partitionBy_.add(KuduPartitionParam.createHashParam( hashParam.getColumns(), hashParam.getNum_buckets())); } else { Preconditions.checkState(param.isSetBy_range_param()); - TDistributeByRangeParam rangeParam = param.getBy_range_param(); - distributeBy_.add(DistributeParam.createRangeParam(rangeParam.getColumns(), + TKuduPartitionByRangeParam rangeParam = param.getBy_range_param(); + partitionBy_.add(KuduPartitionParam.createRangeParam(rangeParam.getColumns(), null)); } } } @Override - public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { + public TTableDescriptor toThriftDescriptor(int tableId, + Set<Long> referencedPartitions) { TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.KUDU_TABLE, getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName()); desc.setKuduTable(getTKuduTable()); @@ -356,9 +357,9 @@ public class KuduTable extends Table { tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_)); tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(","))); tbl.setTable_name(kuduTableName_); - Preconditions.checkNotNull(distributeBy_); - for (DistributeParam distributeParam: distributeBy_) { - tbl.addToDistribute_by(distributeParam.toThrift()); + Preconditions.checkNotNull(partitionBy_); + for (KuduPartitionParam partitionParam: partitionBy_) { + tbl.addToPartition_by(partitionParam.toThrift()); } return tbl; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index 0f8f8fd..b0616c4 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -34,7 +34,7 @@ import org.apache.impala.common.Pair; import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TCreateTableParams; -import org.apache.impala.thrift.TDistributeParam; +import org.apache.impala.thrift.TKuduPartitionParam; import org.apache.impala.thrift.TRangePartition; import org.apache.impala.thrift.TRangePartitionOperationType; import org.apache.impala.util.KuduUtil; @@ -138,22 +138,22 @@ public class KuduCatalogOpExecutor { org.apache.hadoop.hive.metastore.api.Table msTbl, TCreateTableParams params, Schema schema) throws ImpalaRuntimeException { CreateTableOptions tableOpts = new CreateTableOptions(); - // Set the distribution schemes - List<TDistributeParam> distributeParams = params.getDistribute_by(); - if (distributeParams != null) { + // Set the partitioning schemes + List<TKuduPartitionParam> partitionParams = params.getPartition_by(); + if (partitionParams != null) { boolean hasRangePartitioning = false; - for (TDistributeParam distParam: distributeParams) { - if (distParam.isSetBy_hash_param()) { - Preconditions.checkState(!distParam.isSetBy_range_param()); - tableOpts.addHashPartitions(distParam.getBy_hash_param().getColumns(), - distParam.getBy_hash_param().getNum_buckets()); + for (TKuduPartitionParam partParam: partitionParams) { + if (partParam.isSetBy_hash_param()) { + Preconditions.checkState(!partParam.isSetBy_range_param()); + tableOpts.addHashPartitions(partParam.getBy_hash_param().getColumns(), + partParam.getBy_hash_param().getNum_buckets()); } else { - Preconditions.checkState(distParam.isSetBy_range_param()); + Preconditions.checkState(partParam.isSetBy_range_param()); hasRangePartitioning = true; - List<String> rangePartitionColumns = distParam.getBy_range_param().getColumns(); + List<String> rangePartitionColumns = partParam.getBy_range_param().getColumns(); tableOpts.setRangePartitionColumns(rangePartitionColumns); for (TRangePartition rangePartition: - distParam.getBy_range_param().getRange_partitions()) { + partParam.getBy_range_param().getRange_partitions()) { List<Pair<PartialRow, RangePartitionBound>> rangeBounds = getRangePartitionBounds(rangePartition, schema, rangePartitionColumns); Preconditions.checkState(rangeBounds.size() == 2); @@ -164,7 +164,7 @@ public class KuduCatalogOpExecutor { } } } - // If no range-based distribution is specified in a CREATE TABLE statement, Kudu + // If no range-based partitioning is specified in a CREATE TABLE statement, Kudu // generates one by default that includes all the primary key columns. To prevent // this from happening, explicitly set the range partition columns to be // an empty list. @@ -333,7 +333,7 @@ public class KuduCatalogOpExecutor { private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds( TRangePartition rangePartition, KuduTable tbl) throws ImpalaRuntimeException { return getRangePartitionBounds(rangePartition, tbl.getKuduSchema(), - tbl.getRangeDistributionColNames()); + tbl.getRangePartitioningColNames()); } /** @@ -342,20 +342,20 @@ public class KuduCatalogOpExecutor { */ private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds( TRangePartition rangePartition, Schema schema, - List<String> rangeDistributionColNames) throws ImpalaRuntimeException { + List<String> rangePartitioningColNames) throws ImpalaRuntimeException { Preconditions.checkNotNull(schema); - Preconditions.checkState(!rangeDistributionColNames.isEmpty()); + Preconditions.checkState(!rangePartitioningColNames.isEmpty()); Preconditions.checkState(rangePartition.isSetLower_bound_values() || rangePartition.isSetUpper_bound_values()); List<Pair<PartialRow, RangePartitionBound>> rangeBounds = Lists.newArrayListWithCapacity(2); Pair<PartialRow, RangePartitionBound> lowerBound = - KuduUtil.buildRangePartitionBound(schema, rangeDistributionColNames, + KuduUtil.buildRangePartitionBound(schema, rangePartitioningColNames, rangePartition.getLower_bound_values(), rangePartition.isIs_lower_bound_inclusive()); rangeBounds.add(lowerBound); Pair<PartialRow, RangePartitionBound> upperBound = - KuduUtil.buildRangePartitionBound(schema, rangeDistributionColNames, + KuduUtil.buildRangePartitionBound(schema, rangePartitioningColNames, rangePartition.getUpper_bound_values(), rangePartition.isIs_upper_bound_inclusive()); rangeBounds.add(upperBound); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 4aeb96d..d806a2f 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -1440,16 +1440,16 @@ public class AnalyzeDDLTest extends FrontendTestBase { "Partition column name mismatch: tinyint_col != int_col"); // CTAS into managed Kudu tables - AnalyzesOk("create table t primary key (id) distribute by hash (id) into 3 buckets" + + AnalyzesOk("create table t primary key (id) partition by hash (id) into 3 buckets" + " stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " + "bigint_col, float_col, double_col, date_string_col, string_col " + "from functional.alltypestiny"); - AnalyzesOk("create table t primary key (id) distribute by range (id) " + + AnalyzesOk("create table t primary key (id) partition by range (id) " + "(partition values < 10, partition 20 <= values < 30, partition value = 50) " + "stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " + "bigint_col, float_col, double_col, date_string_col, string_col " + "from functional.alltypestiny"); - AnalyzesOk("create table t primary key (id) distribute by hash (id) into 3 buckets, "+ + AnalyzesOk("create table t primary key (id) partition by hash (id) into 3 buckets, "+ "range (id) (partition values < 10, partition 10 <= values < 20, " + "partition value = 30) stored as kudu as select id, bool_col, tinyint_col, " + "smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, " + @@ -1461,27 +1461,27 @@ public class AnalyzeDDLTest extends FrontendTestBase { "external Kudu tables."); // CTAS into Kudu tables with unsupported types - AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" + + AnalysisError("create table t primary key (id) partition by hash into 3 buckets" + " stored as kudu as select id, timestamp_col from functional.alltypestiny", "Cannot create table 't': Type TIMESTAMP is not supported in Kudu"); - AnalysisError("create table t primary key (cs) distribute by hash into 3 buckets" + + AnalysisError("create table t primary key (cs) partition by hash into 3 buckets" + " stored as kudu as select cs from functional.chars_tiny", "Cannot create table 't': Type CHAR(5) is not supported in Kudu"); - AnalysisError("create table t primary key (vc) distribute by hash into 3 buckets" + + AnalysisError("create table t primary key (vc) partition by hash into 3 buckets" + " stored as kudu as select vc from functional.chars_tiny", "Cannot create table 't': Type VARCHAR(32) is not supported in Kudu"); - AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" + + AnalysisError("create table t primary key (id) partition by hash into 3 buckets" + " stored as kudu as select c1 as id from functional.decimal_tiny", "Cannot create table 't': Type DECIMAL(10,4) is not supported in Kudu"); - AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" + + AnalysisError("create table t primary key (id) partition by hash into 3 buckets" + " stored as kudu as select id, s from functional.complextypes_fileformat", "Expr 's' in select list returns a complex type 'STRUCT<f1:STRING,f2:INT>'.\n" + "Only scalar types are allowed in the select list."); - AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" + + AnalysisError("create table t primary key (id) partition by hash into 3 buckets" + " stored as kudu as select id, m from functional.complextypes_fileformat", "Expr 'm' in select list returns a complex type 'MAP<STRING,BIGINT>'.\n" + "Only scalar types are allowed in the select list."); - AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" + + AnalysisError("create table t primary key (id) partition by hash into 3 buckets" + " stored as kudu as select id, a from functional.complextypes_fileformat", "Expr 'a' in select list returns a complex type 'ARRAY<INT>'.\n" + "Only scalar types are allowed in the select list."); @@ -1912,85 +1912,85 @@ public class AnalyzeDDLTest extends FrontendTestBase { @Test public void TestCreateManagedKuduTable() { TestUtils.assumeKuduIsSupported(); - // Test primary keys and distribute by clauses - AnalyzesOk("create table tab (x int primary key) distribute by hash(x) " + + // Test primary keys and partition by clauses + AnalyzesOk("create table tab (x int primary key) partition by hash(x) " + "into 8 buckets stored as kudu"); - AnalyzesOk("create table tab (x int, primary key(x)) distribute by hash(x) " + + AnalyzesOk("create table tab (x int, primary key(x)) partition by hash(x) " + "into 8 buckets stored as kudu"); AnalyzesOk("create table tab (x int, y int, primary key (x, y)) " + - "distribute by hash(x, y) into 8 buckets stored as kudu"); + "partition by hash(x, y) into 8 buckets stored as kudu"); AnalyzesOk("create table tab (x int, y int, primary key (x)) " + - "distribute by hash(x) into 8 buckets stored as kudu"); + "partition by hash(x) into 8 buckets stored as kudu"); AnalyzesOk("create table tab (x int, y int, primary key(x, y)) " + - "distribute by hash(y) into 8 buckets stored as kudu"); - AnalyzesOk("create table tab (x int, y string, primary key (x)) distribute by " + + "partition by hash(y) into 8 buckets stored as kudu"); + AnalyzesOk("create table tab (x int, y string, primary key (x)) partition by " + "hash (x) into 3 buckets, range (x) (partition values < 1, partition " + "1 <= values < 10, partition 10 <= values < 20, partition value = 30) " + "stored as kudu"); - AnalyzesOk("create table tab (x int, y int, primary key (x, y)) distribute by " + + AnalyzesOk("create table tab (x int, y int, primary key (x, y)) partition by " + "range (x, y) (partition value = (2001, 1), partition value = (2002, 1), " + "partition value = (2003, 2)) stored as kudu"); // Non-literal boundary values in range partitions - AnalyzesOk("create table tab (x int, y int, primary key (x)) distribute by " + + AnalyzesOk("create table tab (x int, y int, primary key (x)) partition by " + "range (x) (partition values < 1 + 1, partition (1+3) + 2 < values < 10, " + "partition factorial(4) < values < factorial(5), " + "partition value = factorial(6)) stored as kudu"); - AnalyzesOk("create table tab (x int, y int, primary key(x, y)) distribute by " + + AnalyzesOk("create table tab (x int, y int, primary key(x, y)) partition by " + "range(x, y) (partition value = (1+1, 2+2), partition value = ((1+1+1)+1, 10), " + "partition value = (cast (30 as int), factorial(5))) stored as kudu"); - AnalysisError("create table tab (x int primary key) distribute by range (x) " + + AnalysisError("create table tab (x int primary key) partition by range (x) " + "(partition values < x + 1) stored as kudu", "Only constant values are allowed " + "for range-partition bounds: x + 1"); - AnalysisError("create table tab (x int primary key) distribute by range (x) " + + AnalysisError("create table tab (x int primary key) partition by range (x) " + "(partition values <= isnull(null, null)) stored as kudu", "Range partition " + "values cannot be NULL. Range partition: 'PARTITION VALUES <= " + "isnull(NULL, NULL)'"); - AnalysisError("create table tab (x int primary key) distribute by range (x) " + + AnalysisError("create table tab (x int primary key) partition by range (x) " + "(partition values <= (select count(*) from functional.alltypestiny)) " + "stored as kudu", "Only constant values are allowed for range-partition " + "bounds: (SELECT count(*) FROM functional.alltypestiny)"); // Multilevel partitioning. Data is split into 3 buckets based on 'x' and each // bucket is partitioned into 4 tablets based on the range partitions of 'y'. AnalyzesOk("create table tab (x int, y string, primary key(x, y)) " + - "distribute by hash(x) into 3 buckets, range(y) " + + "partition by hash(x) into 3 buckets, range(y) " + "(partition values < 'aa', partition 'aa' <= values < 'bb', " + "partition 'bb' <= values < 'cc', partition 'cc' <= values) " + "stored as kudu"); // Key column in upper case AnalyzesOk("create table tab (x int, y int, primary key (X)) " + - "distribute by hash (x) into 8 buckets stored as kudu"); + "partition by hash (x) into 8 buckets stored as kudu"); // Flexible Partitioning AnalyzesOk("create table tab (a int, b int, c int, d int, primary key (a, b, c))" + - "distribute by hash (a, b) into 8 buckets, hash(c) into 2 buckets stored as " + + "partition by hash (a, b) into 8 buckets, hash(c) into 2 buckets stored as " + "kudu"); - // No columns specified in the DISTRIBUTE BY HASH clause + // No columns specified in the PARTITION BY HASH clause AnalyzesOk("create table tab (a int primary key, b int, c int, d int) " + - "distribute by hash into 8 buckets stored as kudu"); + "partition by hash into 8 buckets stored as kudu"); // Distribute range data types are picked up during analysis and forwarded to Kudu. // Column names in distribute params should also be case-insensitive. AnalyzesOk("create table tab (a int, b int, c int, d int, primary key(a, b, c, d))" + - "distribute by hash (a, B, c) into 8 buckets, " + + "partition by hash (a, B, c) into 8 buckets, " + "range (A) (partition values < 1, partition 1 <= values < 2, " + "partition 2 <= values < 3, partition 3 <= values < 4, partition 4 <= values) " + "stored as kudu"); - // Allowing range distribution on a subset of the primary keys + // Allowing range partitioning on a subset of the primary keys AnalyzesOk("create table tab (id int, name string, valf float, vali bigint, " + - "primary key (id, name)) distribute by range (name) " + + "primary key (id, name)) partition by range (name) " + "(partition 'aa' < values <= 'bb') stored as kudu"); // Null values in range partition values AnalysisError("create table tab (id int, name string, primary key(id, name)) " + - "distribute by hash (id) into 3 buckets, range (name) " + + "partition by hash (id) into 3 buckets, range (name) " + "(partition value = null, partition value = 1) stored as kudu", "Range partition values cannot be NULL. Range partition: 'PARTITION " + "VALUE = NULL'"); // Primary key specified in tblproperties - AnalysisError(String.format("create table tab (x int) distribute by hash (x) " + + AnalysisError(String.format("create table tab (x int) partition by hash (x) " + "into 8 buckets stored as kudu tblproperties ('%s' = 'x')", KuduTable.KEY_KEY_COLUMNS), "PRIMARY KEY must be used instead of the table " + "property"); // Primary key column that doesn't exist AnalysisError("create table tab (x int, y int, primary key (z)) " + - "distribute by hash (x) into 8 buckets stored as kudu", + "partition by hash (x) into 8 buckets stored as kudu", "PRIMARY KEY column 'z' does not exist in the table"); // Invalid composite primary key AnalysisError("create table tab (x int primary key, primary key(x)) stored " + @@ -2002,65 +2002,65 @@ public class AnalyzeDDLTest extends FrontendTestBase { "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " + "of the column definition."); // Specifying the same primary key column multiple times - AnalysisError("create table tab (x int, primary key (x, x)) distribute by hash (x) " + + AnalysisError("create table tab (x int, primary key (x, x)) partition by hash (x) " + "into 8 buckets stored as kudu", "Column 'x' is listed multiple times as a PRIMARY KEY."); // Number of range partition boundary values should be equal to the number of range // columns. AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " + - "distribute by range(a) (partition value = (1, 2), " + + "partition by range(a) (partition value = (1, 2), " + "partition value = 3, partition value = 4) stored as kudu", "Number of specified range partition values is different than the number of " + - "distribution columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1,2)'"); + "partitioning columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1,2)'"); // Key ranges must match the column types. AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " + - "distribute by hash (a, b, c) into 8 buckets, range (a) " + + "partition by hash (a, b, c) into 8 buckets, range (a) " + "(partition value = 1, partition value = 'abc', partition 3 <= values) " + "stored as kudu", "Range partition value 'abc' (type: STRING) is not type " + - "compatible with distribution column 'a' (type: INT)."); - AnalysisError("create table tab (a tinyint primary key) distribute by range (a) " + + "compatible with partitioning column 'a' (type: INT)."); + AnalysisError("create table tab (a tinyint primary key) partition by range (a) " + "(partition value = 128) stored as kudu", "Range partition value 128 " + - "(type: SMALLINT) is not type compatible with distribution column 'a' " + + "(type: SMALLINT) is not type compatible with partitioning column 'a' " + "(type: TINYINT)"); - AnalysisError("create table tab (a smallint primary key) distribute by range (a) " + + AnalysisError("create table tab (a smallint primary key) partition by range (a) " + "(partition value = 32768) stored as kudu", "Range partition value 32768 " + - "(type: INT) is not type compatible with distribution column 'a' " + + "(type: INT) is not type compatible with partitioning column 'a' " + "(type: SMALLINT)"); - AnalysisError("create table tab (a int primary key) distribute by range (a) " + + AnalysisError("create table tab (a int primary key) partition by range (a) " + "(partition value = 2147483648) stored as kudu", "Range partition value " + - "2147483648 (type: BIGINT) is not type compatible with distribution column 'a' " + + "2147483648 (type: BIGINT) is not type compatible with partitioning column 'a' " + "(type: INT)"); - AnalysisError("create table tab (a bigint primary key) distribute by range (a) " + + AnalysisError("create table tab (a bigint primary key) partition by range (a) " + "(partition value = 9223372036854775808) stored as kudu", "Range partition " + "value 9223372036854775808 (type: DECIMAL(19,0)) is not type compatible with " + - "distribution column 'a' (type: BIGINT)"); + "partitioning column 'a' (type: BIGINT)"); // Test implicit casting/folding of partition values. - AnalyzesOk("create table tab (a int primary key) distribute by range (a) " + + AnalyzesOk("create table tab (a int primary key) partition by range (a) " + "(partition value = false, partition value = true) stored as kudu"); - // Non-key column used in DISTRIBUTE BY + // Non-key column used in PARTITION BY AnalysisError("create table tab (a int, b string, c bigint, primary key (a)) " + - "distribute by range (b) (partition value = 'abc') stored as kudu", + "partition by range (b) (partition value = 'abc') stored as kudu", "Column 'b' in 'RANGE (b) (PARTITION VALUE = 'abc')' is not a key column. " + - "Only key columns can be used in DISTRIBUTE BY."); + "Only key columns can be used in PARTITION BY."); // No float range partition values AnalysisError("create table tab (a int, b int, c int, d int, primary key (a, b, c))" + - "distribute by hash (a, b, c) into 8 buckets, " + + "partition by hash (a, b, c) into 8 buckets, " + "range (a) (partition value = 1.2, partition value = 2) stored as kudu", "Range partition value 1.2 (type: DECIMAL(2,1)) is not type compatible with " + - "distribution column 'a' (type: INT)."); - // Non-existing column used in DISTRIBUTE BY + "partitioning column 'a' (type: INT)."); + // Non-existing column used in PARTITION BY AnalysisError("create table tab (a int, b int, primary key (a, b)) " + - "distribute by range(unknown_column) (partition value = 'abc') stored as kudu", + "partition by range(unknown_column) (partition value = 'abc') stored as kudu", "Column 'unknown_column' in 'RANGE (unknown_column) (PARTITION VALUE = 'abc')' " + - "is not a key column. Only key columns can be used in DISTRIBUTE BY"); + "is not a key column. Only key columns can be used in PARTITION BY"); // Kudu table name is specified in tblproperties - AnalyzesOk("create table tab (x int primary key) distribute by hash (x) " + + AnalyzesOk("create table tab (x int primary key) partition by hash (x) " + "into 8 buckets stored as kudu tblproperties ('kudu.table_name'='tab_1'," + "'kudu.num_tablet_replicas'='1'," + "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081')"); // No port is specified in kudu master address AnalyzesOk("create table tdata_no_port (id int primary key, name string, " + - "valf float, vali bigint) distribute by range(id) (partition values <= 10, " + + "valf float, vali bigint) partition by range(id) (partition values <= 10, " + "partition 10 < values <= 30, partition 30 < values) " + "stored as kudu tblproperties('kudu.master_addresses'='127.0.0.1')"); // Not using the STORED AS KUDU syntax to specify a Kudu table @@ -2078,12 +2078,12 @@ public class AnalyzeDDLTest extends FrontendTestBase { AnalysisError("create table tab (x int primary key) stored as kudu cached in " + "'testPool'", "A Kudu table cannot be cached in HDFS."); // LOCATION cannot be used with Kudu tables - AnalysisError("create table tab (a int primary key) distribute by hash (a) " + + AnalysisError("create table tab (a int primary key) partition by hash (a) " + "into 3 buckets stored as kudu location '/test-warehouse/'", "LOCATION cannot be specified for a Kudu table."); - // DISTRIBUTE BY is required for managed tables. + // PARTITION BY is required for managed tables. AnalysisError("create table tab (a int, primary key (a)) stored as kudu", - "Table distribution must be specified for managed Kudu tables."); + "Table partitioning must be specified for managed Kudu tables."); AnalysisError("create table tab (a int) stored as kudu", "A primary key is required for a Kudu table."); // Using ROW FORMAT with a Kudu table @@ -2105,12 +2105,12 @@ public class AnalyzeDDLTest extends FrontendTestBase { // Unsupported type is PK and partition col String stmt = String.format("create table tab (x %s primary key) " + - "distribute by hash(x) into 3 buckets stored as kudu", t); + "partition by hash(x) into 3 buckets stored as kudu", t); AnalysisError(stmt, expectedError); // Unsupported type is not PK/partition col stmt = String.format("create table tab (x int primary key, y %s) " + - "distribute by hash(x) into 3 buckets stored as kudu", t); + "partition by hash(x) into 3 buckets stored as kudu", t); AnalysisError(stmt, expectedError); } @@ -2125,7 +2125,7 @@ public class AnalyzeDDLTest extends FrontendTestBase { for (String block: blockSize) { AnalyzesOk(String.format("create table tab (x int primary key " + "not null encoding %s compression %s %s %s, y int encoding %s " + - "compression %s %s %s %s) distribute by hash (x) " + + "compression %s %s %s %s) partition by hash (x) " + "into 3 buckets stored as kudu", enc, comp, def, block, enc, comp, def, nul, block)); } @@ -2136,23 +2136,23 @@ public class AnalyzeDDLTest extends FrontendTestBase { // Primary key specified using the PRIMARY KEY clause AnalyzesOk("create table tab (x int not null encoding plain_encoding " + "compression snappy block_size 1, y int null encoding rle compression lz4 " + - "default 1, primary key(x)) distribute by hash (x) into 3 buckets " + + "default 1, primary key(x)) partition by hash (x) into 3 buckets " + "stored as kudu"); // Primary keys can't be null AnalysisError("create table tab (x int primary key null, y int not null) " + - "distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " + + "partition by hash (x) into 3 buckets stored as kudu", "Primary key columns " + "cannot be nullable: x INT PRIMARY KEY NULL"); AnalysisError("create table tab (x int not null, y int null, primary key (x, y)) " + - "distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " + + "partition by hash (x) into 3 buckets stored as kudu", "Primary key columns " + "cannot be nullable: y INT NULL"); // Unsupported encoding value AnalysisError("create table tab (x int primary key, y int encoding invalid_enc) " + - "distribute by hash (x) into 3 buckets stored as kudu", "Unsupported encoding " + + "partition by hash (x) into 3 buckets stored as kudu", "Unsupported encoding " + "value 'INVALID_ENC'. Supported encoding values are: " + Joiner.on(", ").join(Encoding.values())); // Unsupported compression algorithm AnalysisError("create table tab (x int primary key, y int compression " + - "invalid_comp) distribute by hash (x) into 3 buckets stored as kudu", + "invalid_comp) partition by hash (x) into 3 buckets stored as kudu", "Unsupported compression algorithm 'INVALID_COMP'. Supported compression " + "algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values())); // Default values @@ -2160,38 +2160,38 @@ public class AnalyzeDDLTest extends FrontendTestBase { "i3 int default 100, i4 bigint default 1000, vals string default 'test', " + "valf float default cast(1.2 as float), vald double default " + "cast(3.1452 as double), valb boolean default true, " + - "primary key (i1, i2, i3, i4, vals)) distribute by hash (i1) into 3 " + + "primary key (i1, i2, i3, i4, vals)) partition by hash (i1) into 3 " + "buckets stored as kudu"); AnalyzesOk("create table tab (i int primary key default 1+1+1) " + - "distribute by hash (i) into 3 buckets stored as kudu"); + "partition by hash (i) into 3 buckets stored as kudu"); AnalyzesOk("create table tab (i int primary key default factorial(5)) " + - "distribute by hash (i) into 3 buckets stored as kudu"); + "partition by hash (i) into 3 buckets stored as kudu"); AnalyzesOk("create table tab (i int primary key, x int null default " + - "isnull(null, null)) distribute by hash (i) into 3 buckets stored as kudu"); + "isnull(null, null)) partition by hash (i) into 3 buckets stored as kudu"); // Invalid default values AnalysisError("create table tab (i int primary key default 'string_val') " + - "distribute by hash (i) into 3 buckets stored as kudu", "Default value " + + "partition by hash (i) into 3 buckets stored as kudu", "Default value " + "'string_val' (type: STRING) is not compatible with column 'i' (type: INT)."); AnalysisError("create table tab (i int primary key, x int default 1.1) " + - "distribute by hash (i) into 3 buckets stored as kudu", + "partition by hash (i) into 3 buckets stored as kudu", "Default value 1.1 (type: DECIMAL(2,1)) is not compatible with column " + "'x' (type: INT)."); AnalysisError("create table tab (i tinyint primary key default 128) " + - "distribute by hash (i) into 3 buckets stored as kudu", "Default value " + + "partition by hash (i) into 3 buckets stored as kudu", "Default value " + "128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT)."); AnalysisError("create table tab (i int primary key default isnull(null, null)) " + - "distribute by hash (i) into 3 buckets stored as kudu", "Default value of " + + "partition by hash (i) into 3 buckets stored as kudu", "Default value of " + "NULL not allowed on non-nullable column: 'i'"); AnalysisError("create table tab (i int primary key, x int not null " + - "default isnull(null, null)) distribute by hash (i) into 3 buckets " + + "default isnull(null, null)) partition by hash (i) into 3 buckets " + "stored as kudu", "Default value of NULL not allowed on non-nullable column: " + "'x'"); // Invalid block_size values AnalysisError("create table tab (i int primary key block_size 1.1) " + - "distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " + + "partition by hash (i) into 3 buckets stored as kudu", "Invalid value " + "for BLOCK_SIZE: 1.1. A positive INTEGER value is expected."); AnalysisError("create table tab (i int primary key block_size 'val') " + - "distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " + + "partition by hash (i) into 3 buckets stored as kudu", "Invalid value " + "for BLOCK_SIZE: 'val'. A positive INTEGER value is expected."); } @@ -2438,8 +2438,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { // Kudu specific clauses used in an Avro table. AnalysisError("create table functional.new_table (i int) " + - "distribute by hash(i) into 3 buckets stored as avro", - "Only Kudu tables can use the DISTRIBUTE BY clause."); + "partition by hash(i) into 3 buckets stored as avro", + "Only Kudu tables can use the PARTITION BY clause."); AnalysisError("create table functional.new_table (i int primary key) " + "stored as avro", "Unsupported column options for file format 'AVRO': " + "'i INT PRIMARY KEY'"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java index 431ac01..3d6bbec 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java @@ -911,7 +911,7 @@ public class AuthorizationTest { // IMPALA-4000: ALL privileges on SERVER are not required to create managed tables. AuthzOk("create table tpch.kudu_tbl (i int, j int, primary key (i))" + - " DISTRIBUTE BY HASH (i) INTO 9 BUCKETS stored as kudu TBLPROPERTIES " + + " PARTITION BY HASH (i) INTO 9 BUCKETS stored as kudu TBLPROPERTIES " + "('kudu.master_addresses'='127.0.0.1')"); // User does not have permission to create table at the specified location..
