IMPALA-3726: Add support for Kudu-specific column options This commit adds support for Kudu-specific column options in CREATE TABLE statements. The syntax is: CREATE TABLE tbl_name ([col_name type [PRIMARY KEY] [option [...]]] [, ....]) where option is: | NULL | NOT NULL | ENCODING encoding_val | COMPRESSION compression_algorithm | DEFAULT expr | BLOCK_SIZE num
The output of the SHOW CREATE TABLE statement was altered to include all the specified column options for Kudu tables. Change-Id: I727b9ae1b7b2387db752b58081398dd3f3449c02 Reviewed-on: http://gerrit.cloudera.org:8080/5026 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3db5ced4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3db5ced4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3db5ced4 Branch: refs/heads/master Commit: 3db5ced4cee8967f28e337747efba902ce71cfee Parents: 60414f0 Author: Dimitris Tsirogiannis <[email protected]> Authored: Wed Nov 9 15:11:07 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Fri Nov 18 11:41:01 2016 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-table-sink.cc | 6 +- common/thrift/CatalogObjects.thrift | 20 +- fe/src/main/cup/sql-parser.cup | 240 +++++++++++++------ .../analysis/AlterTableAddReplaceColsStmt.java | 2 +- .../analysis/AlterTableChangeColStmt.java | 2 +- .../org/apache/impala/analysis/ColumnDef.java | 196 +++++++++++++-- .../analysis/CreateOrAlterViewStmtBase.java | 5 +- .../analysis/CreateTableAsSelectStmt.java | 9 +- .../analysis/CreateTableLikeFileStmt.java | 8 +- .../org/apache/impala/analysis/InsertStmt.java | 58 +++-- .../org/apache/impala/analysis/TableDef.java | 16 +- .../org/apache/impala/analysis/ToSqlUtils.java | 16 ++ .../java/org/apache/impala/catalog/Column.java | 10 +- .../org/apache/impala/catalog/KuduColumn.java | 86 ++++++- .../org/apache/impala/catalog/KuduTable.java | 13 +- .../java/org/apache/impala/catalog/Table.java | 17 +- .../java/org/apache/impala/planner/Planner.java | 2 + .../impala/service/KuduCatalogOpExecutor.java | 32 ++- .../apache/impala/util/AvroSchemaParser.java | 18 +- .../org/apache/impala/util/AvroSchemaUtils.java | 17 +- .../java/org/apache/impala/util/KuduUtil.java | 157 +++++++++++- fe/src/main/jflex/sql-scanner.flex | 4 + .../apache/impala/analysis/AnalyzeDDLTest.java | 96 +++++++- .../org/apache/impala/analysis/ParserTest.java | 69 +++++- .../functional/functional_schema_template.sql | 43 ++-- .../queries/QueryTest/kudu_delete.test | 9 +- .../queries/QueryTest/kudu_insert.test | 4 +- .../queries/QueryTest/kudu_update.test | 7 +- .../queries/QueryTest/kudu_upsert.test | 11 +- tests/query_test/test_kudu.py | 52 ++-- tests/shell/test_shell_commandline.py | 2 +- 31 files changed, 973 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index 5dd6336..a195e84 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -195,17 +195,13 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { bool add_row = true; for (int j = 0; j < output_expr_ctxs_.size(); ++j) { - // For INSERT, output_expr_ctxs_ will contain all columns of the table in order. - // For UPDATE and UPSERT, output_expr_ctxs_ only contains the columns that the op + // output_expr_ctxs_ only contains the columns that the op // applies to, i.e. columns explicitly mentioned in the query, and // referenced_columns is then used to map to actual column positions. int col = kudu_table_sink_.referenced_columns.empty() ? j : kudu_table_sink_.referenced_columns[j]; void* value = output_expr_ctxs_[j]->GetValue(current_row); - - // If the value is NULL, we only need to explicitly set it for UPDATE and UPSERT. - // For INSERT, it can be ignored as unspecified cols will be implicitly set to NULL. if (value == NULL) { if (table_schema.Column(j).is_nullable()) { KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/common/thrift/CatalogObjects.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 9f0c6d4..10cb777 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -72,7 +72,18 @@ enum THdfsCompression { SNAPPY, SNAPPY_BLOCKED, LZO, - LZ4 + LZ4, + ZLIB +} + +enum TColumnEncoding { + AUTO, + PLAIN, + PREFIX, + GROUP_VARINT, + RLE, + DICTIONARY, + BIT_SHUFFLE } enum THdfsSeqCompressionMode { @@ -191,11 +202,14 @@ struct TColumn { 8: optional string column_qualifier 9: optional bool is_binary - // Indicates whether this is a Kudu column. If true implies all following Kudu specific - // fields are set. + // All the following are Kudu-specific column properties 10: optional bool is_kudu_column 11: optional bool is_key 12: optional bool is_nullable + 13: optional TColumnEncoding encoding + 14: optional THdfsCompression compression + 15: optional Exprs.TExpr default_value + 16: optional i32 block_size } // Represents a block in an HDFS file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/cup/sql-parser.cup ---------------------------------------------------------------------- diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index 1884036..2fc765d 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -18,15 +18,18 @@ package org.apache.impala.analysis; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java_cup.runtime.Symbol; import org.apache.impala.analysis.ColumnDef; +import org.apache.impala.analysis.ColumnDef.Option; import org.apache.impala.analysis.UnionStmt.Qualifier; import org.apache.impala.analysis.UnionStmt.UnionOperand; import org.apache.impala.analysis.RangePartition; @@ -240,16 +243,17 @@ parser code {: // ALL KEYWORDS ALSO NEED TO BE ADDED TO THE ident_or_kw PRODUCTION. terminal KW_ADD, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_ANALYTIC, KW_AND, KW_ANTI, KW_API_VERSION, - KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BOOLEAN, - KW_BUCKETS, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE, KW_CHAR, - KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPUTE, KW_CREATE, - KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, - KW_DECIMAL, KW_DELETE, KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISTINCT, KW_DISTRIBUTE, - KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, - KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN, - KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, - KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF, - KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, + KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BLOCKSIZE, + KW_BOOLEAN, KW_BUCKETS, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE, + KW_CHAR, KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPRESSION, + KW_COMPUTE, KW_CREATE, KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES, + KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE, KW_DELIMITED, KW_DESC, + KW_DESCRIBE, KW_DISTINCT, KW_DISTRIBUTE, KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE, + KW_ENCODING, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL, + KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT, + KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, + KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF, KW_ILIKE, + KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN, KW_KUDU, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORDER, @@ -280,6 +284,10 @@ terminal String STRING_LITERAL; terminal String UNMATCHED_STRING_LITERAL; terminal String UNEXPECTED_CHAR; +// IMPALA-3726 introduced the DEFAULT keyword which could break existing applications +// that use the identifier "KEYWORD" as database, column or table names. To avoid that, +// the ident_or_default non-terminal is introduced and should be used instead of IDENT. +nonterminal String ident_or_keyword, ident_or_default; nonterminal StatementBase stmt; // Single select statement. nonterminal SelectStmt select_stmt; @@ -399,7 +407,6 @@ nonterminal CreateDataSrcStmt create_data_src_stmt; nonterminal DropDataSrcStmt drop_data_src_stmt; nonterminal ShowDataSrcsStmt show_data_srcs_stmt; nonterminal StructField struct_field_def; -nonterminal String ident_or_keyword; nonterminal DistributeParam distribute_hash_param; nonterminal List<RangePartition> range_params_list; nonterminal RangePartition range_param; @@ -415,7 +422,7 @@ nonterminal ArrayList<StructField> struct_field_def_list; // Options for DDL commands - CREATE/DROP/ALTER nonterminal HdfsCachingOp cache_op_val; nonterminal BigDecimal opt_cache_op_replication; -nonterminal String comment_val; +nonterminal String comment_val, opt_comment_val; nonterminal Boolean external_val; nonterminal Boolean purge_val; nonterminal String opt_init_string_val; @@ -444,6 +451,13 @@ nonterminal String opt_kw_column; nonterminal String opt_kw_table; nonterminal Boolean overwrite_val; nonterminal Boolean cascade_val; +nonterminal Boolean nullability_val; +nonterminal String encoding_val; +nonterminal String compression_val; +nonterminal Expr default_val; +nonterminal LiteralExpr block_size_val; +nonterminal Pair<Option, Object> column_option; +nonterminal Map<Option, Object> column_options_map; // For GRANT/REVOKE/AUTH DDL statements nonterminal ShowRolesStmt show_roles_stmt; @@ -487,6 +501,7 @@ nonterminal TFunctionCategory opt_function_category; precedence left KW_OR; precedence left KW_AND; precedence right KW_NOT, NOT; +precedence left KW_DEFAULT; precedence left KW_BETWEEN, KW_IN, KW_IS, KW_EXISTS; precedence left KW_LIKE, KW_RLIKE, KW_ILIKE, KW_REGEXP, KW_IREGEXP; precedence left EQUAL, NOTEQUAL, LESSTHAN, GREATERTHAN, KW_FROM, KW_DISTINCT; @@ -789,31 +804,33 @@ opt_kw_table ::= show_roles_stmt ::= KW_SHOW KW_ROLES {: RESULT = new ShowRolesStmt(false, null); :} - | KW_SHOW KW_ROLE KW_GRANT KW_GROUP IDENT:group + | KW_SHOW KW_ROLE KW_GRANT KW_GROUP ident_or_default:group {: RESULT = new ShowRolesStmt(false, group); :} | KW_SHOW KW_CURRENT KW_ROLES {: RESULT = new ShowRolesStmt(true, null); :} ; show_grant_role_stmt ::= - KW_SHOW KW_GRANT KW_ROLE IDENT:role + KW_SHOW KW_GRANT KW_ROLE ident_or_default:role {: RESULT = new ShowGrantRoleStmt(role, null); :} - | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON server_ident:server_kw + | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON server_ident:server_kw {: RESULT = new ShowGrantRoleStmt(role, PrivilegeSpec.createServerScopedPriv(TPrivilegeLevel.ALL)); :} - | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON KW_DATABASE IDENT:db_name + | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON + KW_DATABASE ident_or_default:db_name {: RESULT = new ShowGrantRoleStmt(role, PrivilegeSpec.createDbScopedPriv(TPrivilegeLevel.ALL, db_name)); :} - | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON KW_TABLE table_name:tbl_name + | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON KW_TABLE table_name:tbl_name {: RESULT = new ShowGrantRoleStmt(role, PrivilegeSpec.createTableScopedPriv(TPrivilegeLevel.ALL, tbl_name)); :} - | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON uri_ident:uri_kw STRING_LITERAL:uri + | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON uri_ident:uri_kw + STRING_LITERAL:uri {: RESULT = new ShowGrantRoleStmt(role, PrivilegeSpec.createUriScopedPriv(TPrivilegeLevel.ALL, new HdfsUri(uri))); @@ -821,40 +838,40 @@ show_grant_role_stmt ::= ; create_drop_role_stmt ::= - KW_CREATE KW_ROLE IDENT:role + KW_CREATE KW_ROLE ident_or_default:role {: RESULT = new CreateDropRoleStmt(role, false); :} - | KW_DROP KW_ROLE IDENT:role + | KW_DROP KW_ROLE ident_or_default:role {: RESULT = new CreateDropRoleStmt(role, true); :} ; grant_role_stmt ::= - KW_GRANT KW_ROLE IDENT:role KW_TO KW_GROUP IDENT:group + KW_GRANT KW_ROLE ident_or_default:role KW_TO KW_GROUP ident_or_default:group {: RESULT = new GrantRevokeRoleStmt(role, group, true); :} ; revoke_role_stmt ::= - KW_REVOKE KW_ROLE IDENT:role KW_FROM KW_GROUP IDENT:group + KW_REVOKE KW_ROLE ident_or_default:role KW_FROM KW_GROUP ident_or_default:group {: RESULT = new GrantRevokeRoleStmt(role, group, false); :} ; grant_privilege_stmt ::= - KW_GRANT privilege_spec:priv KW_TO opt_kw_role:opt_role IDENT:role + KW_GRANT privilege_spec:priv KW_TO opt_kw_role:opt_role ident_or_default:role opt_with_grantopt:grant_opt {: RESULT = new GrantRevokePrivStmt(role, priv, true, grant_opt); :} ; revoke_privilege_stmt ::= KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM - opt_kw_role:opt_role IDENT:role + opt_kw_role:opt_role ident_or_default:role {: RESULT = new GrantRevokePrivStmt(role, priv, false, grant_opt); :} ; privilege_spec ::= privilege:priv KW_ON server_ident:server_kw {: RESULT = PrivilegeSpec.createServerScopedPriv(priv); :} - | privilege:priv KW_ON server_ident:server_kw IDENT:server_name + | privilege:priv KW_ON server_ident:server_kw ident_or_default:server_name {: RESULT = PrivilegeSpec.createServerScopedPriv(priv, server_name); :} - | privilege:priv KW_ON KW_DATABASE IDENT:db_name + | privilege:priv KW_ON KW_DATABASE ident_or_default:db_name {: RESULT = PrivilegeSpec.createDbScopedPriv(priv, db_name); :} | privilege:priv KW_ON KW_TABLE table_name:tbl_name {: RESULT = PrivilegeSpec.createTableScopedPriv(priv, tbl_name); :} @@ -902,9 +919,9 @@ alter_tbl_stmt ::= RESULT = new AlterTableAddPartitionStmt(table, partition, location, if_not_exists, cache_op); :} - | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column IDENT:col_name + | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column ident_or_default:col_name {: RESULT = new AlterTableDropColStmt(table, col_name); :} - | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column IDENT:col_name + | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column ident_or_default:col_name column_def:col_def {: RESULT = new AlterTableChangeColStmt(table, col_name, col_def); :} | KW_ALTER KW_TABLE table_name:table KW_DROP if_exists_val:if_exists @@ -927,7 +944,7 @@ alter_tbl_stmt ::= table_property_type:target LPAREN properties_map:properties RPAREN {: RESULT = new AlterTableSetTblProperties(table, partitions, target, properties); :} | KW_ALTER KW_TABLE table_name:table opt_partition_set:partition KW_SET - KW_COLUMN KW_STATS IDENT:col LPAREN properties_map:map RPAREN + KW_COLUMN KW_STATS ident_or_default:col LPAREN properties_map:map RPAREN {: // The opt_partition_set is used to avoid conflicts even though // a partition clause does not make sense for this stmt. If a partition @@ -966,8 +983,8 @@ replace_existing_cols_val ::= ; create_db_stmt ::= - KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists IDENT:db_name - comment_val:comment location_val:location + KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists ident_or_default:db_name + opt_comment_val:comment location_val:location {: RESULT = new CreateDbStmt(db_name, comment, location, if_not_exists); :} ; @@ -1032,9 +1049,9 @@ create_tbl_stmt ::= RESULT = new CreateTableStmt(tbl_def); :} | tbl_def_with_col_defs:tbl_def - KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id IDENT:data_src_name + KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id ident_or_default:data_src_name opt_init_string_val:init_string - comment_val:comment + opt_comment_val:comment {: // Need external_val in the grammar to avoid shift/reduce conflict with other // CREATE TABLE statements. @@ -1067,7 +1084,7 @@ create_tbl_stmt ::= create_tbl_like_stmt ::= tbl_def_without_col_defs:tbl_def KW_LIKE table_name:other_table - comment_val:comment + opt_comment_val:comment file_format_create_table_val:file_format location_val:location {: RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), other_table, @@ -1089,7 +1106,8 @@ tbl_def_with_col_defs ::= tbl_def.getColumnDefs().addAll(list); RESULT = tbl_def; :} - | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA primary_keys:primary_keys RPAREN + | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA + primary_keys:primary_keys RPAREN {: tbl_def.getColumnDefs().addAll(list); tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys); @@ -1103,7 +1121,7 @@ primary_keys ::= ; tbl_options ::= - comment_val:comment row_format_val:row_format serde_properties:serde_props + opt_comment_val:comment row_format_val:row_format serde_properties:serde_props file_format_create_table_val:file_format location_val:location cache_op_val:cache_op tbl_properties:tbl_props {: @@ -1282,6 +1300,11 @@ opt_cache_op_replication ::= comment_val ::= KW_COMMENT STRING_LITERAL:comment {: RESULT = comment; :} + ; + +opt_comment_val ::= + KW_COMMENT STRING_LITERAL:comment + {: RESULT = comment; :} | /* empty */ {: RESULT = null; :} ; @@ -1422,20 +1445,81 @@ column_def_list ::= ; column_def ::= - IDENT:col_name type_def:type is_primary_key_val:primary_key comment_val:comment - {: RESULT = new ColumnDef(col_name, type, primary_key, comment); :} + ident_or_default:col_name type_def:type column_options_map:options + {: RESULT = new ColumnDef(col_name, type, options); :} + | ident_or_default:col_name type_def:type + {: RESULT = new ColumnDef(col_name, type); :} + ; + +column_options_map ::= + column_options_map:map column_option:col_option + {: + if (map.put(col_option.first, col_option.second) != null) { + throw new Exception(String.format("Column option %s is specified multiple times", + col_option.first.toString())); + } + RESULT = map; + :} + | column_option:col_option + {: + Map<Option, Object> options = Maps.newHashMap(); + options.put(col_option.first, col_option.second); + RESULT = options; + :} + ; + +column_option ::= + is_primary_key_val:primary_key + {: RESULT = new Pair<Option, Object>(Option.IS_PRIMARY_KEY, primary_key); :} + | nullability_val:nullability + {: RESULT = new Pair<Option, Object>(Option.IS_NULLABLE, nullability); :} + | encoding_val:encoding + {: RESULT = new Pair<Option, Object>(Option.ENCODING, encoding); :} + | compression_val:compression + {: RESULT = new Pair<Option, Object>(Option.COMPRESSION, compression); :} + | default_val:default_val + {: RESULT = new Pair<Option, Object>(Option.DEFAULT, default_val); :} + | block_size_val:block_size + {: RESULT = new Pair<Option, Object>(Option.BLOCK_SIZE, block_size); :} + | comment_val:comment + {: RESULT = new Pair<Option, Object>(Option.COMMENT, comment); :} ; is_primary_key_val ::= KW_PRIMARY key_ident {: RESULT = true; :} - | /* empty */ + ; + +nullability_val ::= + KW_NOT KW_NULL {: RESULT = false; :} + | KW_NULL + {: RESULT = true; :} + ; + +encoding_val ::= + KW_ENCODING ident_or_default:encoding_ident + {: RESULT = encoding_ident; :} + ; + +compression_val ::= + KW_COMPRESSION ident_or_default:compression_ident + {: RESULT = compression_ident; :} + ; + +default_val ::= + KW_DEFAULT expr:default_val + {: RESULT = default_val; :} + ; + +block_size_val ::= + KW_BLOCKSIZE literal:block_size + {: RESULT = block_size; :} ; create_view_stmt ::= KW_CREATE KW_VIEW if_not_exists_val:if_not_exists table_name:view_name - view_column_defs:col_defs comment_val:comment KW_AS query_stmt:view_def + view_column_defs:col_defs opt_comment_val:comment KW_AS query_stmt:view_def {: RESULT = new CreateViewStmt(if_not_exists, view_name, col_defs, comment, view_def); :} @@ -1443,7 +1527,7 @@ create_view_stmt ::= create_data_src_stmt ::= KW_CREATE KW_DATA source_ident:is_source_id - if_not_exists_val:if_not_exists IDENT:data_src_name + if_not_exists_val:if_not_exists ident_or_default:data_src_name KW_LOCATION STRING_LITERAL:location KW_CLASS STRING_LITERAL:class_name KW_API_VERSION STRING_LITERAL:api_version @@ -1534,8 +1618,12 @@ view_column_def_list ::= ; view_column_def ::= - IDENT:col_name comment_val:comment - {: RESULT = new ColumnDef(col_name, null, comment); :} + ident_or_default:col_name opt_comment_val:comment + {: + Map<Option, Object> options = Maps.newHashMap(); + if (comment != null) options.put(Option.COMMENT, comment); + RESULT = new ColumnDef(col_name, null, options); + :} ; alter_view_stmt ::= @@ -1571,7 +1659,8 @@ drop_stats_stmt ::= ; drop_db_stmt ::= - KW_DROP db_or_schema_kw if_exists_val:if_exists IDENT:db_name cascade_val:cascade + KW_DROP db_or_schema_kw if_exists_val:if_exists ident_or_default:db_name + cascade_val:cascade {: RESULT = new DropDbStmt(db_name, if_exists, cascade); :} ; @@ -1593,7 +1682,8 @@ drop_function_stmt ::= ; drop_data_src_stmt ::= - KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists IDENT:data_src_name + KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists + ident_or_default:data_src_name {: RESULT = new DropDataSrcStmt(data_src_name, if_exists); :} ; @@ -1683,7 +1773,7 @@ static_partition_key_value_list ::= partition_key_value ::= // Dynamic partition key values. - IDENT:column + ident_or_default:column {: RESULT = new PartitionKeyValue(column, null); :} | static_partition_key_value:partition {: RESULT = partition; :} @@ -1691,7 +1781,7 @@ partition_key_value ::= static_partition_key_value ::= // Static partition key values. - IDENT:column EQUAL expr:e + ident_or_default:column EQUAL expr:e {: RESULT = new PartitionKeyValue(column, e); :} ; @@ -1825,11 +1915,12 @@ opt_with_clause ::= ; with_view_def ::= - IDENT:alias KW_AS LPAREN query_stmt:query RPAREN + ident_or_default:alias KW_AS LPAREN query_stmt:query RPAREN {: RESULT = new View(alias, query, null); :} | STRING_LITERAL:alias KW_AS LPAREN query_stmt:query RPAREN {: RESULT = new View(alias, query, null); :} - | IDENT:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN query_stmt:query RPAREN + | ident_or_default:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN + query_stmt:query RPAREN {: RESULT = new View(alias, query, col_names); :} | STRING_LITERAL:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN query_stmt:query RPAREN @@ -1957,7 +2048,7 @@ values_operand_list ::= ; use_stmt ::= - KW_USE IDENT:db + KW_USE ident_or_default:db {: RESULT = new UseStmt(db); :} ; @@ -1966,9 +2057,9 @@ show_tables_stmt ::= {: RESULT = new ShowTablesStmt(); :} | KW_SHOW KW_TABLES show_pattern:showPattern {: RESULT = new ShowTablesStmt(showPattern); :} - | KW_SHOW KW_TABLES KW_IN IDENT:db + | KW_SHOW KW_TABLES KW_IN ident_or_default:db {: RESULT = new ShowTablesStmt(db, null); :} - | KW_SHOW KW_TABLES KW_IN IDENT:db show_pattern:showPattern + | KW_SHOW KW_TABLES KW_IN ident_or_default:db show_pattern:showPattern {: RESULT = new ShowTablesStmt(db, showPattern); :} ; @@ -1996,9 +2087,9 @@ show_functions_stmt ::= {: RESULT = new ShowFunctionsStmt(null, null, fn_type); :} | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS show_pattern:showPattern {: RESULT = new ShowFunctionsStmt(null, showPattern, fn_type); :} - | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN IDENT:db + | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN ident_or_default:db {: RESULT = new ShowFunctionsStmt(db, null, fn_type); :} - | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN IDENT:db + | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN ident_or_default:db show_pattern:showPattern {: RESULT = new ShowFunctionsStmt(db, showPattern, fn_type); :} ; @@ -2051,7 +2142,7 @@ show_files_stmt ::= ; describe_db_stmt ::= - KW_DESCRIBE db_or_schema_kw describe_output_style:style IDENT:db + KW_DESCRIBE db_or_schema_kw describe_output_style:style ident_or_default:db {: RESULT = new DescribeDbStmt(db, style); :} ; @@ -2108,9 +2199,9 @@ select_clause ::= ; set_stmt ::= - KW_SET IDENT:key EQUAL literal:l + KW_SET ident_or_default:key EQUAL literal:l {: RESULT = new SetStmt(key, l.getStringValue()); :} - | KW_SET IDENT:key EQUAL IDENT:ident + | KW_SET ident_or_default:key EQUAL ident_or_default:ident {: RESULT = new SetStmt(key, ident); :} | KW_SET {: RESULT = new SetStmt(null, null); :} @@ -2140,9 +2231,9 @@ select_list_item ::= ; alias_clause ::= - KW_AS IDENT:ident + KW_AS ident_or_default:ident {: RESULT = ident; :} - | IDENT:ident + | ident_or_default:ident {: RESULT = ident; :} | KW_AS STRING_LITERAL:l {: RESULT = l; :} @@ -2158,9 +2249,9 @@ star_expr ::= ; table_name ::= - IDENT:tbl + ident_or_default:tbl {: RESULT = new TableName(null, tbl); :} - | IDENT:db DOT IDENT:tbl + | ident_or_default:db DOT ident_or_default:tbl {: RESULT = new TableName(db, tbl); :} ; @@ -2295,13 +2386,13 @@ opt_plan_hints ::= ; ident_list ::= - IDENT:ident + ident_or_default:ident {: ArrayList<String> list = new ArrayList<String>(); list.add(ident); RESULT = list; :} - | ident_list:list COMMA IDENT:ident + | ident_list:list COMMA ident_or_default:ident {: list.add(ident); RESULT = list; @@ -2525,7 +2616,7 @@ function_call_expr ::= | function_name:fn_name LPAREN function_params:params RPAREN {: RESULT = FunctionCallExpr.createExpr(fn_name, params); :} // Below is a special case for EXTRACT. Idents are used to avoid adding new keywords. - | function_name:fn_name LPAREN IDENT:u KW_FROM expr:t RPAREN + | function_name:fn_name LPAREN ident_or_default:u KW_FROM expr:t RPAREN {: RESULT = new ExtractFromExpr(fn_name, u, t); :} ; @@ -2828,13 +2919,13 @@ slot_ref ::= ; dotted_path ::= - IDENT:ident + ident_or_default:ident {: ArrayList<String> list = new ArrayList<String>(); list.add(ident); RESULT = list; :} - | dotted_path:list DOT IDENT:ident + | dotted_path:list DOT ident_or_default:ident {: list.add(ident); RESULT = list; @@ -2895,7 +2986,7 @@ type ::= // that we can parse type strings from the Hive Metastore which // may have unquoted identifiers corresponding to keywords. struct_field_def ::= - ident_or_keyword:name COLON type:t comment_val:comment + ident_or_keyword:name COLON type:t opt_comment_val:comment {: RESULT = new StructField(name, t, comment); :} ; @@ -2913,6 +3004,13 @@ struct_field_def_list ::= :} ; +ident_or_default ::= + IDENT:name + {: RESULT = name.toString(); :} + | KW_DEFAULT:name + {: RESULT = name.toString(); :} + ; + ident_or_keyword ::= IDENT:r {: RESULT = r.toString(); :} @@ -2946,6 +3044,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_BINARY:r {: RESULT = r.toString(); :} + | KW_BLOCKSIZE:r + {: RESULT = r.toString(); :} | KW_BOOLEAN:r {: RESULT = r.toString(); :} | KW_BUCKETS:r @@ -2974,6 +3074,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_COMMENT:r {: RESULT = r.toString(); :} + | KW_COMPRESSION:r + {: RESULT = r.toString(); :} | KW_COMPUTE:r {: RESULT = r.toString(); :} | KW_CREATE:r @@ -2994,6 +3096,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_DECIMAL:r {: RESULT = r.toString(); :} + | KW_DEFAULT:r + {: RESULT = r.toString(); :} | KW_DELETE:r {: RESULT = r.toString(); :} | KW_DELIMITED:r @@ -3014,6 +3118,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_ELSE:r {: RESULT = r.toString(); :} + | KW_ENCODING:r + {: RESULT = r.toString(); :} | KW_END:r {: RESULT = r.toString(); :} | KW_ESCAPED:r http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java index 0354117..feda138 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java @@ -90,7 +90,7 @@ public class AlterTableAddReplaceColsStmt extends AlterTableStmt { // partition columns. Set<String> colNames = Sets.newHashSet(); for (ColumnDef c: columnDefs_) { - c.analyze(); + c.analyze(analyzer); String colName = c.getColName().toLowerCase(); if (existingPartitionKeys.contains(colName)) { throw new AnalysisException( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java index 9130740..5c4bfee 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java @@ -90,7 +90,7 @@ public class AlterTableChangeColStmt extends AlterTableStmt { } // Check that the new column def's name is valid. - newColDef_.analyze(); + newColDef_.analyze(analyzer); // Verify that if the column name is being changed, the new name doesn't conflict // with an existing column. if (!colName_.toLowerCase().equals(newColDef_.getColName().toLowerCase()) && http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java index 923a0a6..f65aa27 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java @@ -18,21 +18,26 @@ package org.apache.impala.analysis; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.LinkedHashMap; import java.util.Map; import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; + import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; - import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TColumn; +import org.apache.impala.util.KuduUtil; import org.apache.impala.util.MetaStoreUtil; +import org.apache.kudu.ColumnSchema.CompressionAlgorithm; +import org.apache.kudu.ColumnSchema.Encoding; /** * Represents a column definition in a CREATE/ALTER TABLE/VIEW statement. @@ -40,31 +45,89 @@ import org.apache.impala.util.MetaStoreUtil; * whereas column definitions in CREATE/ALTER VIEW statements infer the column type from * the corresponding view definition. All column definitions have an optional comment. * Since a column definition refers a column stored in the Metastore, the column name - * must be valid according to the Metastore's rules (see @MetaStoreUtils). + * must be valid according to the Metastore's rules (see @MetaStoreUtils). A number of + * additional column options may be specified for Kudu tables. */ public class ColumnDef { private final String colName_; - private String comment_; - // Required in CREATE/ALTER TABLE stmts. Set to NULL in CREATE/ALTER VIEW stmts, // for which we setType() after analyzing the defining view definition stmt. private final TypeDef typeDef_; private Type type_; + private String comment_; - // Set to true if the user specified "PRIMARY KEY" in the column definition. Kudu table - // definitions may use this. - private boolean isPrimaryKey_; - - public ColumnDef(String colName, TypeDef typeDef, String comment) { - this(colName, typeDef, false, comment); + // Available column options + public enum Option { + IS_PRIMARY_KEY, + IS_NULLABLE, + ENCODING, + COMPRESSION, + DEFAULT, + BLOCK_SIZE, + COMMENT } - public ColumnDef(String colName, TypeDef typeDef, boolean isPrimaryKey, - String comment) { + // Kudu-specific column options + // + // Set to true if the user specified "PRIMARY KEY" in the column definition. + private boolean isPrimaryKey_; + // Set to true if this column may contain null values. Can be NULL if + // not specified. + private Boolean isNullable_; + private String encodingVal_; + // Encoding for this column; set in analysis. + private Encoding encoding_; + private String compressionVal_; + // Compression algorithm for this column; set in analysis. + private CompressionAlgorithm compression_; + // Default value for this column. + private Expr defaultValue_; + // Desired block size for this column. + private LiteralExpr blockSize_; + + public ColumnDef(String colName, TypeDef typeDef, Map<Option, Object> options) { + Preconditions.checkNotNull(options); colName_ = colName.toLowerCase(); typeDef_ = typeDef; - isPrimaryKey_ = isPrimaryKey; - comment_ = comment; + for (Map.Entry<Option, Object> option: options.entrySet()) { + switch (option.getKey()) { + case IS_PRIMARY_KEY: + Preconditions.checkState(option.getValue() instanceof Boolean); + isPrimaryKey_ = (Boolean) option.getValue(); + break; + case IS_NULLABLE: + Preconditions.checkState(option.getValue() instanceof Boolean); + isNullable_ = (Boolean) option.getValue(); + break; + case ENCODING: + Preconditions.checkState(option.getValue() instanceof String); + encodingVal_ = ((String) option.getValue()).toUpperCase(); + break; + case COMPRESSION: + Preconditions.checkState(option.getValue() instanceof String); + compressionVal_ = ((String) option.getValue()).toUpperCase(); + break; + case DEFAULT: + Preconditions.checkState(option.getValue() instanceof Expr); + defaultValue_ = (Expr) option.getValue(); + break; + case BLOCK_SIZE: + Preconditions.checkState(option.getValue() instanceof LiteralExpr); + blockSize_ = (LiteralExpr) option.getValue(); + break; + case COMMENT: + Preconditions.checkState(option.getValue() instanceof String); + comment_ = (String) option.getValue(); + break; + default: + throw new IllegalStateException(String.format("Illegal option %s", + option.getKey())); + } + } + } + + public ColumnDef(String colName, TypeDef typeDef) { + this(colName, typeDef, Collections.<Option, Object>emptyMap()); } /** @@ -81,8 +144,7 @@ public class ColumnDef { colName_ = fs.getName(); typeDef_ = new TypeDef(type); comment_ = fs.getComment(); - isPrimaryKey_ = false; - analyze(); + analyze(null); } public String getColName() { return colName_; } @@ -92,8 +154,13 @@ public class ColumnDef { boolean isPrimaryKey() { return isPrimaryKey_; } public void setComment(String comment) { comment_ = comment; } public String getComment() { return comment_; } + public boolean hasKuduOptions() { + return isPrimaryKey_ || isNullable_ != null || encodingVal_ != null + || compressionVal_ != null || defaultValue_ != null || blockSize_ != null; + } + public boolean isNullable() { return isNullable_ != null && isNullable_; } - public void analyze() throws AnalysisException { + public void analyze(Analyzer analyzer) throws AnalysisException { // Check whether the column name meets the Metastore's requirements. if (!MetaStoreUtils.validateName(colName_)) { throw new AnalysisException("Invalid column/field name: " + colName_); @@ -112,6 +179,10 @@ public class ColumnDef { "%s has %d characters.", colName_, MetaStoreUtil.MAX_TYPE_NAME_LENGTH, typeSql, typeSql.length())); } + if (hasKuduOptions()) { + Preconditions.checkNotNull(analyzer); + analyzeKuduOptions(analyzer); + } if (comment_ != null && comment_.length() > MetaStoreUtil.CREATE_MAX_COMMENT_LENGTH) { throw new AnalysisException(String.format( @@ -121,6 +192,79 @@ public class ColumnDef { } } + private void analyzeKuduOptions(Analyzer analyzer) throws AnalysisException { + if (isPrimaryKey_ && isNullable_ != null && isNullable_) { + throw new AnalysisException("Primary key columns cannot be nullable: " + + toString()); + } + // Encoding value + if (encodingVal_ != null) { + try { + encoding_ = Encoding.valueOf(encodingVal_); + } catch (IllegalArgumentException e) { + throw new AnalysisException(String.format("Unsupported encoding value '%s'. " + + "Supported encoding values are: %s", encodingVal_, + Joiner.on(", ").join(Encoding.values()))); + } + } + // Compression algorithm + if (compressionVal_ != null) { + try { + compression_ = CompressionAlgorithm.valueOf(compressionVal_); + } catch (IllegalArgumentException e) { + throw new AnalysisException(String.format("Unsupported compression " + + "algorithm '%s'. Supported compression algorithms are: %s", compressionVal_, + Joiner.on(", ").join(CompressionAlgorithm.values()))); + } + } + // Analyze the default value, if any. + // TODO: Similar checks are applied for range partition values in + // RangePartition.analyzeBoundaryValue(). Consider consolidating the logic into a + // single function. + if (defaultValue_ != null) { + try { + defaultValue_.analyze(analyzer); + } catch (AnalysisException e) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for default values: %s", defaultValue_.toSql()), e); + } + if (!defaultValue_.isConstant()) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for default values: %s", defaultValue_.toSql())); + } + defaultValue_ = LiteralExpr.create(defaultValue_, analyzer.getQueryCtx()); + if (defaultValue_ == null) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for default values: %s", defaultValue_.toSql())); + } + if (defaultValue_.getType().isNull() && ((isNullable_ != null && !isNullable_) + || isPrimaryKey_)) { + throw new AnalysisException(String.format("Default value of NULL not allowed " + + "on non-nullable column: '%s'", getColName())); + } + if (!Type.isImplicitlyCastable(defaultValue_.getType(), type_, true)) { + throw new AnalysisException(String.format("Default value %s (type: %s) " + + "is not compatible with column '%s' (type: %s).", defaultValue_.toSql(), + defaultValue_.getType().toSql(), colName_, type_.toSql())); + } + if (!defaultValue_.getType().equals(type_)) { + Expr castLiteral = defaultValue_.uncheckedCastTo(type_); + Preconditions.checkNotNull(castLiteral); + defaultValue_ = LiteralExpr.create(castLiteral, analyzer.getQueryCtx()); + } + Preconditions.checkNotNull(defaultValue_); + } + + // Analyze the block size value, if any. + if (blockSize_ != null) { + blockSize_.analyze(null); + if (!blockSize_.getType().isIntegerType()) { + throw new AnalysisException(String.format("Invalid value for BLOCK_SIZE: %s. " + + "A positive INTEGER value is expected.", blockSize_.toSql())); + } + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder(colName_).append(" "); @@ -130,6 +274,11 @@ public class ColumnDef { sb.append(typeDef_); } if (isPrimaryKey_) sb.append(" PRIMARY KEY"); + if (isNullable_ != null) sb.append(isNullable_ ? " NULL" : " NOT NULL"); + if (encoding_ != null) sb.append(" ENCODING " + encoding_.toString()); + if (compression_ != null) sb.append(" COMPRESSION " + compression_.toString()); + if (defaultValue_ != null) sb.append(" DEFAULT_VALUE " + defaultValue_.toSql()); + if (blockSize_ != null) sb.append(" BLOCK_SIZE " + blockSize_.toSql()); if (comment_ != null) sb.append(String.format(" COMMENT '%s'", comment_)); return sb.toString(); } @@ -146,12 +295,21 @@ public class ColumnDef { .append(isPrimaryKey_, rhs.isPrimaryKey_) .append(typeDef_, rhs.typeDef_) .append(type_, rhs.type_) + .append(isNullable_, rhs.isNullable_) + .append(encoding_, rhs.encoding_) + .append(compression_, rhs.compression_) + .append(defaultValue_, rhs.defaultValue_) + .append(blockSize_, rhs.blockSize_) .isEquals(); } public TColumn toThrift() { - TColumn col = new TColumn(new TColumn(getColName(), type_.toThrift())); - col.setComment(getComment()); + TColumn col = new TColumn(getColName(), type_.toThrift()); + Integer blockSize = + blockSize_ == null ? null : (int) ((NumericLiteral) blockSize_).getIntValue(); + KuduUtil.setColumnOptions(col, isPrimaryKey_, isNullable_, encoding_, + compression_, defaultValue_, blockSize); + if (comment_ != null) col.setComment(comment_); return col; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java index 54b8557..5f524f5 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java @@ -18,6 +18,7 @@ package org.apache.impala.analysis; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -114,7 +115,7 @@ public abstract class CreateOrAlterViewStmtBase extends StatementBase { List<String> labels = viewDefStmt_.getColLabels(); Preconditions.checkState(exprs.size() == labels.size()); for (int i = 0; i < viewDefStmt_.getColLabels().size(); ++i) { - ColumnDef colDef = new ColumnDef(labels.get(i), null, null); + ColumnDef colDef = new ColumnDef(labels.get(i), null); colDef.setType(exprs.get(i).getType()); finalColDefs_.add(colDef); } @@ -124,7 +125,7 @@ public abstract class CreateOrAlterViewStmtBase extends StatementBase { // duplicate column names. Set<String> distinctColNames = Sets.newHashSet(); for (ColumnDef colDesc: finalColDefs_) { - colDesc.analyze(); + colDesc.analyze(null); if (!distinctColNames.add(colDesc.getColName().toLowerCase())) { throw new AnalysisException("Duplicate column name: " + colDesc.getColName()); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java index 1e53d1e..5dca6b5 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java @@ -17,8 +17,9 @@ package org.apache.impala.analysis; -import java.util.EnumSet; +import java.util.Collections; import java.util.List; +import java.util.EnumSet; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.Db; @@ -147,7 +148,7 @@ public class CreateTableAsSelectStmt extends StatementBase { "mismatch: %s != %s", partitionLabel, colLabel)); } - ColumnDef colDef = new ColumnDef(colLabel, null, null); + ColumnDef colDef = new ColumnDef(colLabel, null); colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType()); createStmt_.getPartitionColumnDefs().add(colDef); } @@ -159,8 +160,8 @@ public class CreateTableAsSelectStmt extends StatementBase { int colCnt = tmpQueryStmt.getColLabels().size(); createStmt_.getColumnDefs().clear(); for (int i = 0; i < colCnt; ++i) { - ColumnDef colDef = new ColumnDef( - tmpQueryStmt.getColLabels().get(i), null, null); + ColumnDef colDef = new ColumnDef(tmpQueryStmt.getColLabels().get(i), null, + Collections.<ColumnDef.Option, Object>emptyMap()); colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType()); createStmt_.getColumnDefs().add(colDef); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java index a653323..432e9c1 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java @@ -21,9 +21,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; @@ -36,7 +38,6 @@ import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.ArrayType; import org.apache.impala.catalog.HdfsCompression; import org.apache.impala.catalog.HdfsFileFormat; -import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.MapType; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.StructField; @@ -328,8 +329,9 @@ public class CreateTableLikeFileStmt extends CreateTableStmt { Type type = convertParquetType(field); Preconditions.checkNotNull(type); String colName = field.getName(); - schema.add(new ColumnDef(colName, new TypeDef(type), - "Inferred from Parquet file.")); + Map<ColumnDef.Option, Object> option = Maps.newHashMap(); + option.put(ColumnDef.Option.COMMENT, "Inferred from Parquet file."); + schema.add(new ColumnDef(colName, new TypeDef(type), option)); } return schema; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index 55005f9..87f7cef 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -27,6 +27,7 @@ import org.apache.impala.authorization.PrivilegeRequestBuilder; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.HBaseTable; import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Table; import org.apache.impala.catalog.Type; @@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -126,18 +126,20 @@ public class InsertStmt extends StatementBase { // Output expressions that produce the final results to write to the target table. May // include casts. Set in prepareExpressions(). - // If this is an INSERT, will contain one Expr for all non-partition columns of the - // target table with NullLiterals where an output column isn't explicitly mentioned. - // The i'th expr produces the i'th column of the target table. - // If this is an UPSERT, will contain one Expr per column mentioned in the query and - // mentionedUpsertColumns_ is used to map between the Exprs and columns in the target - // table. + // If this is an INSERT on a non-Kudu table, it will contain one Expr for all + // non-partition columns of the target table with NullLiterals where an output + // column isn't explicitly mentioned. The i'th expr produces the i'th column of + // the target table. + // + // For Kudu tables (INSERT and UPSERT operations), it will contain one Expr per column + // mentioned in the query and mentionedColumns_ is used to map between the Exprs + // and columns in the target table. private ArrayList<Expr> resultExprs_ = Lists.newArrayList(); // Position mapping of exprs in resultExprs_ to columns in the target table - - // resultExprs_[i] produces the mentionedUpsertColumns_[i] column of the target table. - // Only used for UPSERT, set in prepareExpressions(). - private final List<Integer> mentionedUpsertColumns_ = Lists.newArrayList(); + // resultExprs_[i] produces the mentionedColumns_[i] column of the target table. + // Only used for Kudu tables, set in prepareExpressions(). + private final List<Integer> mentionedColumns_ = Lists.newArrayList(); // Set in analyze(). Exprs corresponding to key columns of Kudu tables. Empty for // non-Kudu tables. @@ -209,7 +211,7 @@ public class InsertStmt extends StatementBase { hasNoShuffleHint_ = false; hasClusteredHint_ = false; resultExprs_.clear(); - mentionedUpsertColumns_.clear(); + mentionedColumns_.clear(); primaryKeyExprs_.clear(); } @@ -277,8 +279,9 @@ public class InsertStmt extends StatementBase { // Finally, prepareExpressions analyzes the expressions themselves, and confirms that // they are type-compatible with the target columns. Where columns are not mentioned // (and by this point, we know that missing columns are not partition columns), - // prepareExpressions assigns them a NULL literal expressions, unless this is an - // UPSERT, in which case we don't want to overwrite unmentioned columns with NULL. + // prepareExpressions assigns them a NULL literal expressions, unless the target is + // a Kudu table, in which case we don't want to overwrite unmentioned columns with + // NULL. // An null permutation clause is the same as listing all non-partition columns in // order. @@ -602,7 +605,7 @@ public class InsertStmt extends StatementBase { * * 3. Populates resultExprs_ with type-compatible expressions, in Hive column order, * for all expressions in the select-list. Unmentioned columns are assigned NULL literal - * expressions, unless this is an UPSERT. + * expressions, unless the target is a Kudu table. * * 4. Result exprs for key columns of Kudu tables are stored in primaryKeyExprs_. * @@ -667,6 +670,7 @@ public class InsertStmt extends StatementBase { expr.analyze(analyzer); } + boolean isKuduTable = table_ instanceof KuduTable; // Finally, 'undo' the permutation so that the selectListExprs are in Hive column // order, and add NULL expressions to all missing columns, unless this is an UPSERT. ArrayList<Column> columns = table_.getColumnsInHiveOrder(); @@ -676,23 +680,32 @@ public class InsertStmt extends StatementBase { for (int i = 0; i < selectListExprs.size(); ++i) { if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) { resultExprs_.add(selectListExprs.get(i)); - if (isUpsert_) mentionedUpsertColumns_.add(col); + if (isKuduTable) mentionedColumns_.add(col); matchFound = true; break; } } // If no match is found, either the column is a clustering column with a static // value, or it was unmentioned and therefore should have a NULL select-list - // expression if this is an INSERT. + // expression if this is an INSERT and the target is not a Kudu table. if (!matchFound) { - if (tblColumn.getPosition() >= numClusteringCols && !isUpsert_) { - // Unmentioned non-clustering columns get NULL literals with the appropriate - // target type because Parquet cannot handle NULL_TYPE (IMPALA-617). - resultExprs_.add(NullLiteral.create(tblColumn.getType())); + if (tblColumn.getPosition() >= numClusteringCols) { + if (isKuduTable) { + Preconditions.checkState(tblColumn instanceof KuduColumn); + KuduColumn kuduCol = (KuduColumn) tblColumn; + if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()) { + throw new AnalysisException("Missing values for column that is not " + + "nullable and has no default value " + kuduCol.getName()); + } + } else { + // Unmentioned non-clustering columns get NULL literals with the appropriate + // target type because Parquet cannot handle NULL_TYPE (IMPALA-617). + resultExprs_.add(NullLiteral.create(tblColumn.getType())); + } } } // Store exprs for Kudu key columns. - if (matchFound && table_ instanceof KuduTable) { + if (matchFound && isKuduTable) { KuduTable kuduTable = (KuduTable) table_; if (kuduTable.isPrimaryKeyColumn(tblColumn.getName())) { primaryKeyExprs_.add(Iterables.getLast(resultExprs_)); @@ -779,9 +792,8 @@ public class InsertStmt extends StatementBase { public DataSink createDataSink() { // analyze() must have been called before. Preconditions.checkState(table_ != null); - Preconditions.checkState(isUpsert_ || mentionedUpsertColumns_.isEmpty()); return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT, - partitionKeyExprs_, mentionedUpsertColumns_, overwrite_); + partitionKeyExprs_, mentionedColumns_, overwrite_); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/TableDef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java index ce08e36..1c16954 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java @@ -174,7 +174,7 @@ class TableDef { Preconditions.checkState(tableName_ != null && !tableName_.isEmpty()); fqTableName_ = analyzer.getFqTableName(getTblName()); fqTableName_.analyze(); - analyzeColumnDefs(); + analyzeColumnDefs(analyzer); analyzePrimaryKeys(); if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE) @@ -194,16 +194,20 @@ class TableDef { * Analyzes table and partition column definitions, checking whether all column * names are unique. */ - private void analyzeColumnDefs() throws AnalysisException { + private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException { Set<String> colNames = Sets.newHashSet(); for (ColumnDef colDef: columnDefs_) { - colDef.analyze(); + colDef.analyze(analyzer); if (!colNames.add(colDef.getColName().toLowerCase())) { throw new AnalysisException("Duplicate column name: " + colDef.getColName()); } + if (getFileFormat() != THdfsFileFormat.KUDU && colDef.hasKuduOptions()) { + throw new AnalysisException(String.format("Unsupported column options for " + + "file format '%s': '%s'", getFileFormat().name(), colDef.toString())); + } } for (ColumnDef colDef: getPartitionColumnDefs()) { - colDef.analyze(); + colDef.analyze(analyzer); if (!colDef.getType().supportsTablePartitioning()) { throw new AnalysisException( String.format("Type '%s' is not supported as partition-column type " + @@ -247,6 +251,10 @@ class TableDef { throw new AnalysisException(String.format( "PRIMARY KEY column '%s' does not exist in the table", colName)); } + if (colDef.isNullable()) { + throw new AnalysisException("Primary key columns cannot be nullable: " + + colDef.toString()); + } primaryKeyColDefs_.add(colDef); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java index aa24336..f01a78c 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java +++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java @@ -41,6 +41,7 @@ import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HBaseTable; import org.apache.impala.catalog.HdfsCompression; import org.apache.impala.catalog.HdfsFileFormat; +import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.RowFormat; import org.apache.impala.catalog.Table; @@ -351,6 +352,21 @@ public class ToSqlUtils { private static String columnToSql(Column col) { StringBuilder sb = new StringBuilder(col.getName()); if (col.getType() != null) sb.append(" " + col.getType().toSql()); + if (col instanceof KuduColumn) { + KuduColumn kuduCol = (KuduColumn) col; + Boolean isNullable = kuduCol.isNullable(); + if (isNullable != null) sb.append(isNullable ? " NULL" : " NOT NULL"); + if (kuduCol.getEncoding() != null) sb.append(" ENCODING " + kuduCol.getEncoding()); + if (kuduCol.getCompression() != null) { + sb.append(" COMPRESSION " + kuduCol.getCompression()); + } + if (kuduCol.getDefaultValue() != null) { + sb.append(" DEFAULT " + kuduCol.getDefaultValue().toSql()); + } + if (kuduCol.getBlockSize() != 0) { + sb.append(String.format(" BLOCK_SIZE %d", kuduCol.getBlockSize())); + } + } if (!Strings.isNullOrEmpty(col.getComment())) { sb.append(String.format(" COMMENT '%s'", col.getComment())); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/catalog/Column.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Column.java b/fe/src/main/java/org/apache/impala/catalog/Column.java index 91928aa..0830f61 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Column.java +++ b/fe/src/main/java/org/apache/impala/catalog/Column.java @@ -31,6 +31,8 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.impala.common.ImpalaRuntimeException; + /** * Internal representation of column-related metadata. * Owned by Catalog instance. @@ -84,7 +86,7 @@ public class Column { .add("position_", position_).toString(); } - public static Column fromThrift(TColumn columnDesc) { + public static Column fromThrift(TColumn columnDesc) throws ImpalaRuntimeException { String comment = columnDesc.isSetComment() ? columnDesc.getComment() : null; Preconditions.checkState(columnDesc.isSetPosition()); int position = columnDesc.getPosition(); @@ -98,11 +100,7 @@ public class Column { columnDesc.getColumn_qualifier(), columnDesc.isIs_binary(), Type.fromThrift(columnDesc.getColumnType()), comment, position); } else if (columnDesc.isIs_kudu_column()) { - Preconditions.checkState(columnDesc.isSetIs_key()); - Preconditions.checkState(columnDesc.isSetIs_nullable()); - col = new KuduColumn(columnDesc.getColumnName(), columnDesc.isIs_key(), - columnDesc.isIs_nullable(), - Type.fromThrift(columnDesc.getColumnType()), comment, position); + col = KuduColumn.fromThrift(columnDesc, position); } else { // Hdfs table column. col = new Column(columnDesc.getColumnName(), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java index 404dbf5..5640748 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java @@ -17,36 +17,108 @@ package org.apache.impala.catalog; +import com.google.common.base.Preconditions; +import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.thrift.TColumn; +import org.apache.impala.util.KuduUtil; + +import org.apache.kudu.ColumnSchema.CompressionAlgorithm; +import org.apache.kudu.ColumnSchema.Encoding; +import org.apache.kudu.ColumnSchema; /** - * Describes a Kudu column mapped to a Hive column (as described in the metastore). - * This class extends Column with Kudu-specific information about whether it is part of a primary - * key, and whether it is nullable. + * Represents a Kudu column. + * + * This class extends Column with Kudu-specific information: + * - primary key + * - nullability constraint + * - encoding + * - compression + * - default value + * - desired block size */ public class KuduColumn extends Column { private final boolean isKey_; private final boolean isNullable_; + private final Encoding encoding_; + private final CompressionAlgorithm compression_; + private final LiteralExpr defaultValue_; + private final int blockSize_; - public KuduColumn(String name, boolean isKey, boolean isNullable, Type type, - String comment, int position) { + private KuduColumn(String name, Type type, boolean isKey, boolean isNullable, + Encoding encoding, CompressionAlgorithm compression, LiteralExpr defaultValue, + int blockSize, String comment, int position) { super(name, type, comment, position); isKey_ = isKey; isNullable_ = isNullable; + encoding_ = encoding; + compression_ = compression; + defaultValue_ = defaultValue; + blockSize_ = blockSize; + } + + public static KuduColumn fromColumnSchema(ColumnSchema colSchema, int position) + throws ImpalaRuntimeException { + Type type = KuduUtil.toImpalaType(colSchema.getType()); + Object defaultValue = colSchema.getDefaultValue(); + LiteralExpr defaultValueExpr = null; + if (defaultValue != null) { + try { + defaultValueExpr = LiteralExpr.create(defaultValue.toString(), type); + } catch (AnalysisException e) { + throw new ImpalaRuntimeException(String.format("Error parsing default value: " + + "'%s'", defaultValue), e); + } + Preconditions.checkNotNull(defaultValueExpr); + } + return new KuduColumn(colSchema.getName(), type, colSchema.isKey(), + colSchema.isNullable(), colSchema.getEncoding(), + colSchema.getCompressionAlgorithm(), defaultValueExpr, + colSchema.getDesiredBlockSize(), null, position); + } + + public static KuduColumn fromThrift(TColumn column, int position) + throws ImpalaRuntimeException { + Preconditions.checkState(column.isSetIs_key()); + Preconditions.checkState(column.isSetIs_nullable()); + Type columnType = Type.fromThrift(column.getColumnType()); + Encoding encoding = null; + if (column.isSetEncoding()) encoding = KuduUtil.fromThrift(column.getEncoding()); + CompressionAlgorithm compression = null; + if (column.isSetCompression()) { + compression = KuduUtil.fromThrift(column.getCompression()); + } + LiteralExpr defaultValue = null; + if (column.isSetDefault_value()) { + defaultValue = + LiteralExpr.fromThrift(column.getDefault_value().getNodes().get(0), columnType); + } + int blockSize = 0; + if (column.isSetBlock_size()) blockSize = column.getBlock_size(); + return new KuduColumn(column.getColumnName(), columnType, column.isIs_key(), + column.isIs_nullable(), encoding, compression, defaultValue, blockSize, null, + position); } public boolean isKey() { return isKey_; } public boolean isNullable() { return isNullable_; } + public Encoding getEncoding() { return encoding_; } + public CompressionAlgorithm getCompression() { return compression_; } + public LiteralExpr getDefaultValue() { return defaultValue_; } + public boolean hasDefaultValue() { return defaultValue_ != null; } + public int getBlockSize() { return blockSize_; } @Override public TColumn toThrift() { TColumn colDesc = new TColumn(name_, type_.toThrift()); + KuduUtil.setColumnOptions(colDesc, isKey_, isNullable_, encoding_, compression_, + defaultValue_, blockSize_); if (comment_ != null) colDesc.setComment(comment_); colDesc.setCol_stats(getStats().toThrift()); colDesc.setPosition(position_); colDesc.setIs_kudu_column(true); - colDesc.setIs_key(isKey_); - colDesc.setIs_nullable(isNullable_); return colDesc; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index 7b906a7..0e88905 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -215,12 +215,13 @@ public class KuduTable extends Table { cols.clear(); int pos = 0; for (ColumnSchema colSchema: kuduTable.getSchema().getColumns()) { - Type type = KuduUtil.toImpalaType(colSchema.getType()); - String colName = colSchema.getName(); - cols.add(new FieldSchema(colName, type.toSql().toLowerCase(), null)); - boolean isKey = colSchema.isKey(); - if (isKey) primaryKeyColumnNames_.add(colName); - addColumn(new KuduColumn(colName, isKey, !isKey, type, null, pos)); + KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos); + Preconditions.checkNotNull(kuduCol); + // Add the HMS column + cols.add(new FieldSchema(kuduCol.getName(), kuduCol.getType().toSql().toLowerCase(), + null)); + if (kuduCol.isKey()) primaryKeyColumnNames_.add(kuduCol.getName()); + addColumn(kuduCol); ++pos; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/catalog/Table.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index be9dc7b..7bde786 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -256,12 +256,17 @@ public abstract class Table implements CatalogObject { colsByPos_.clear(); colsByPos_.ensureCapacity(columns.size()); - for (int i = 0; i < columns.size(); ++i) { - Column col = Column.fromThrift(columns.get(i)); - colsByPos_.add(col.getPosition(), col); - colsByName_.put(col.getName().toLowerCase(), col); - ((StructType) type_.getItemType()).addField( - new StructField(col.getName(), col.getType(), col.getComment())); + try { + for (int i = 0; i < columns.size(); ++i) { + Column col = Column.fromThrift(columns.get(i)); + colsByPos_.add(col.getPosition(), col); + colsByName_.put(col.getName().toLowerCase(), col); + ((StructType) type_.getItemType()).addField( + new StructField(col.getName(), col.getType(), col.getComment())); + } + } catch (ImpalaRuntimeException e) { + throw new TableLoadingException(String.format("Error loading schema for " + + "table '%s'", getName()), e); } numClusteringCols_ = thriftTable.getClustering_columns().size(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 52524fb..658fd0a 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -186,6 +186,8 @@ public class Planner { Table targetTable = ctx_.getAnalysisResult().getInsertStmt().getTargetTable(); graph.addTargetColumnLabels(targetTable); Preconditions.checkNotNull(targetTable); + // Lineage is not currently supported for Kudu tables (see IMPALA-4283) + if (targetTable instanceof KuduTable) return fragments; List<Expr> exprs = Lists.newArrayList(); if (targetTable instanceof HBaseTable) { exprs.addAll(resultExprs); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index a2b1fb9..068f426 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -31,6 +31,7 @@ import org.apache.impala.catalog.TableNotFoundException; import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Pair; +import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TCreateTableParams; import org.apache.impala.thrift.TDistributeParam; import org.apache.impala.thrift.TRangePartition; @@ -74,7 +75,7 @@ public class KuduCatalogOpExecutor { throw new ImpalaRuntimeException(String.format( "Table '%s' already exists in Kudu.", kuduTableName)); } - Schema schema = createTableSchema(msTbl, params); + Schema schema = createTableSchema(params); CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema); kudu.createTable(kuduTableName, schema, tableOpts); } catch (Exception e) { @@ -86,22 +87,31 @@ public class KuduCatalogOpExecutor { /** * Creates the schema of a new Kudu table. */ - private static Schema createTableSchema( - org.apache.hadoop.hive.metastore.api.Table msTbl, TCreateTableParams params) + private static Schema createTableSchema(TCreateTableParams params) throws ImpalaRuntimeException { Set<String> keyColNames = new HashSet<>(params.getPrimary_key_column_names()); - List<FieldSchema> fieldSchemas = msTbl.getSd().getCols(); - List<ColumnSchema> colSchemas = new ArrayList<>(fieldSchemas.size()); - for (FieldSchema fieldSchema: fieldSchemas) { - Type type = Type.parseColumnType(fieldSchema.getType()); + List<ColumnSchema> colSchemas = new ArrayList<>(params.getColumnsSize()); + for (TColumn column: params.getColumns()) { + Type type = Type.fromThrift(column.getColumnType()); Preconditions.checkState(type != null); org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type); // Create the actual column and check if the column is a key column ColumnSchemaBuilder csb = - new ColumnSchemaBuilder(fieldSchema.getName(), kuduType); - boolean isKeyCol = keyColNames.contains(fieldSchema.getName()); - csb.key(isKeyCol); - csb.nullable(!isKeyCol); + new ColumnSchemaBuilder(column.getColumnName(), kuduType); + Preconditions.checkState(column.isSetIs_key()); + csb.key(keyColNames.contains(column.getColumnName())); + if (column.isSetIs_nullable()) csb.nullable(column.isIs_nullable()); + if (column.isSetDefault_value()) { + csb.defaultValue(KuduUtil.getKuduDefaultValue(column.getDefault_value(), kuduType, + column.getColumnName())); + } + if (column.isSetBlock_size()) csb.desiredBlockSize(column.getBlock_size()); + if (column.isSetEncoding()) { + csb.encoding(KuduUtil.fromThrift(column.getEncoding())); + } + if (column.isSetCompression()) { + csb.compressionAlgorithm(KuduUtil.fromThrift(column.getCompression())); + } colSchemas.add(csb.build()); } return new Schema(colSchemas); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java index 36a586b..9840766 100644 --- a/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java +++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java @@ -29,10 +29,12 @@ import java.util.Hashtable; import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; -import org.codehaus.jackson.JsonNode; - import org.apache.impala.analysis.ColumnDef; import org.apache.impala.analysis.TypeDef; import org.apache.impala.catalog.ArrayType; @@ -42,8 +44,7 @@ import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.codehaus.jackson.JsonNode; /** * Utility class used to parse Avro schema. Checks that the schema is valid @@ -81,9 +82,12 @@ public class AvroSchemaParser { } List<ColumnDef> colDefs = Lists.newArrayListWithCapacity(schema.getFields().size()); for (Schema.Field field: schema.getFields()) { + Map<ColumnDef.Option, Object> option = Maps.newHashMap(); + String comment = field.doc(); + if (comment != null) option.put(ColumnDef.Option.COMMENT, comment); ColumnDef colDef = new ColumnDef(field.name(), - new TypeDef(getTypeInfo(field.schema(), field.name())), field.doc()); - colDef.analyze(); + new TypeDef(getTypeInfo(field.schema(), field.name())), option); + colDef.analyze(null); colDefs.add(colDef); } return colDefs; @@ -201,4 +205,4 @@ public class AvroSchemaParser { } return propValue; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java index f5b3bb4..1da466e 100644 --- a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java +++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java @@ -23,19 +23,19 @@ import java.net.URL; import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.io.IOUtils; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; - import org.apache.impala.analysis.ColumnDef; import org.apache.impala.catalog.PrimitiveType; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FileSystemUtil; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - /** * Contains utility functions for dealing with Avro schemas. */ @@ -139,10 +139,13 @@ public class AvroSchemaUtils { if ((colDef.getType().isStringType() && avroCol.getType().isStringType())) { Preconditions.checkState( avroCol.getType().getPrimitiveType() == PrimitiveType.STRING); + Map<ColumnDef.Option, Object> option = Maps.newHashMap(); + String comment = avroCol.getComment(); + if (comment != null) option.put(ColumnDef.Option.COMMENT, comment); ColumnDef reconciledColDef = new ColumnDef( - avroCol.getColName(), colDef.getTypeDef(), avroCol.getComment()); + avroCol.getColName(), colDef.getTypeDef(), option); try { - reconciledColDef.analyze(); + reconciledColDef.analyze(null); } catch (AnalysisException e) { Preconditions.checkNotNull( null, "reconciledColDef.analyze() should never throw."); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/util/KuduUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index 65fae74..dd09a28 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -29,18 +29,26 @@ import org.apache.impala.common.Pair; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExpr; import org.apache.impala.thrift.TExprNode; +import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.analysis.Expr; +import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TColumnEncoding; +import org.apache.impala.thrift.THdfsCompression; + +import com.google.common.base.Splitter; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnSchema.Encoding; +import org.apache.kudu.ColumnSchema.CompressionAlgorithm; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduClient.KuduClientBuilder; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RangePartitionBound; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - public class KuduUtil { private static final String KUDU_TABLE_NAME_PREFIX = "impala::"; @@ -138,6 +146,145 @@ public class KuduUtil { } } + public static Object getKuduDefaultValue(TExpr defaultValue, + org.apache.kudu.Type type, String colName) throws ImpalaRuntimeException { + Preconditions.checkState(defaultValue.getNodes().size() == 1); + TExprNode literal = defaultValue.getNodes().get(0); + switch (type) { + case INT8: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return (byte) literal.getInt_literal().getValue(); + case INT16: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return (short) literal.getInt_literal().getValue(); + case INT32: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return (int) literal.getInt_literal().getValue(); + case INT64: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return (long) literal.getInt_literal().getValue(); + case FLOAT: + checkCorrectType(literal.isSetFloat_literal(), type, colName, literal); + return (float) literal.getFloat_literal().getValue(); + case DOUBLE: + checkCorrectType(literal.isSetFloat_literal(), type, colName, literal); + return (double) literal.getFloat_literal().getValue(); + case STRING: + checkCorrectType(literal.isSetString_literal(), type, colName, literal); + return literal.getString_literal().getValue(); + default: + throw new ImpalaRuntimeException("Unsupported value for column type: " + + type.toString()); + } + } + + public static Encoding fromThrift(TColumnEncoding encoding) + throws ImpalaRuntimeException { + switch (encoding) { + case AUTO: + return Encoding.AUTO_ENCODING; + case PLAIN: + return Encoding.PLAIN_ENCODING; + case PREFIX: + return Encoding.PREFIX_ENCODING; + case GROUP_VARINT: + return Encoding.GROUP_VARINT; + case RLE: + return Encoding.RLE; + case DICTIONARY: + return Encoding.DICT_ENCODING; + case BIT_SHUFFLE: + return Encoding.BIT_SHUFFLE; + default: + throw new ImpalaRuntimeException("Unsupported encoding: " + + encoding.toString()); + } + } + + public static TColumnEncoding toThrift(Encoding encoding) + throws ImpalaRuntimeException { + switch (encoding) { + case AUTO_ENCODING: + return TColumnEncoding.AUTO; + case PLAIN_ENCODING: + return TColumnEncoding.PLAIN; + case PREFIX_ENCODING: + return TColumnEncoding.PREFIX; + case GROUP_VARINT: + return TColumnEncoding.GROUP_VARINT; + case RLE: + return TColumnEncoding.RLE; + case DICT_ENCODING: + return TColumnEncoding.DICTIONARY; + case BIT_SHUFFLE: + return TColumnEncoding.BIT_SHUFFLE; + default: + throw new ImpalaRuntimeException("Unsupported encoding: " + + encoding.toString()); + } + } + + public static CompressionAlgorithm fromThrift(THdfsCompression compression) + throws ImpalaRuntimeException { + switch (compression) { + case DEFAULT: + return CompressionAlgorithm.DEFAULT_COMPRESSION; + case NONE: + return CompressionAlgorithm.NO_COMPRESSION; + case SNAPPY: + return CompressionAlgorithm.SNAPPY; + case LZ4: + return CompressionAlgorithm.LZ4; + case ZLIB: + return CompressionAlgorithm.ZLIB; + default: + throw new ImpalaRuntimeException("Unsupported compression algorithm: " + + compression.toString()); + } + } + + public static THdfsCompression toThrift(CompressionAlgorithm compression) + throws ImpalaRuntimeException { + switch (compression) { + case NO_COMPRESSION: + return THdfsCompression.NONE; + case DEFAULT_COMPRESSION: + return THdfsCompression.DEFAULT; + case SNAPPY: + return THdfsCompression.SNAPPY; + case LZ4: + return THdfsCompression.LZ4; + case ZLIB: + return THdfsCompression.ZLIB; + default: + throw new ImpalaRuntimeException("Unsupported compression algorithm: " + + compression.toString()); + } + } + + public static TColumn setColumnOptions(TColumn column, boolean isKey, + Boolean isNullable, Encoding encoding, CompressionAlgorithm compression, + Expr defaultValue, Integer blockSize) { + column.setIs_key(isKey); + if (isNullable != null) column.setIs_nullable(isNullable); + try { + if (encoding != null) column.setEncoding(toThrift(encoding)); + if (compression != null) column.setCompression(toThrift(compression)); + } catch (ImpalaRuntimeException e) { + // This shouldn't happen + throw new IllegalStateException(String.format("Error parsing " + + "encoding/compression values for Kudu column '%s': %s", column.getColumnName(), + e.getMessage())); + } + + if (defaultValue != null) { + Preconditions.checkState(defaultValue instanceof LiteralExpr); + column.setDefault_value(defaultValue.treeToThrift()); + } + if (blockSize != null) column.setBlock_size(blockSize); + return column; + } + /** * If correctType is true, returns. Otherwise throws a formatted error message * indicating problems with the type of the literal of the range literal.
