HIVE-19059: Support DEFAULT keyword with INSERT and UPDATE (Vineet Garg, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e98d59d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e98d59d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e98d59d Branch: refs/heads/master Commit: 9e98d59d4afc6a228df05b8f1b5a2bee5c8b8557 Parents: 9d0f9c0 Author: Vineet Garg <[email protected]> Authored: Sat Mar 31 18:28:50 2018 -0700 Committer: Vineet Garg <[email protected]> Committed: Sat Mar 31 18:29:33 2018 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 1 + .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 189 +- .../insert_into_default_keyword.q | 116 + .../llap/insert_into_default_keyword.q.out | 2291 ++++++++++++++++++ 4 files changed, 2594 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9e98d59d/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 516f804..ff8adf7 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -570,6 +570,7 @@ minillaplocal.query.files=\ input16_cc.q,\ insert_after_drop_partition.q,\ insert_dir_distcp.q,\ + insert_into_default_keyword.q,\ insert_into_with_schema.q,\ insert_values_orig_table.q,\ insert_values_orig_table_use_metadata.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/9e98d59d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 53d5a12..99e2c72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -223,12 +224,14 @@ import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFArray; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCardinalityViolation; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFInline; import org.apache.hadoop.hive.ql.util.ResourceDownloader; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; @@ -615,8 +618,188 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return aggregationTrees; } + /** + * This method figures out if current AST is for INSERT INTO + * @param qbp qbParseInfo + * @param dest destination clause + * @return true or false + */ + private boolean isInsertInto(QBParseInfo qbp, String dest) { + // get the destination and check if it is TABLE + if(qbp == null || dest == null ) return false; + ASTNode destNode = qbp.getDestForClause(dest); + if(destNode != null && destNode.getType() == HiveParser.TOK_TAB) { + return true; + } + return false; + } + + /** + * Given an AST this method figures out if it is a value clause + * e.g. VALUES(1,3..) + */ + private boolean isValueClause(ASTNode select) { + if(select == null) return false; + if(select.getChildCount() == 1) { + ASTNode selectExpr = (ASTNode)select.getChild(0); + if(selectExpr.getChildCount() == 1 ) { + ASTNode selectChildExpr = (ASTNode)selectExpr.getChild(0); + if(selectChildExpr.getType() == HiveParser.TOK_FUNCTION) { + ASTNode inline = (ASTNode)selectChildExpr.getChild(0); + ASTNode func = (ASTNode)selectChildExpr.getChild(1); + if(inline.getText().equals(GenericUDTFInline.class.getAnnotation(Description.class).name()) + && func.getType() == HiveParser.TOK_FUNCTION) { + ASTNode arrayNode = (ASTNode)func.getChild(0); + ASTNode funcNode= (ASTNode)func.getChild(1); + if(arrayNode.getText().equals(GenericUDFArray.class.getAnnotation(Description.class).name() ) + && funcNode.getType() == HiveParser.TOK_FUNCTION) { + return true; + } + } + } + } + } + return false; + } + + /** + * This method creates a list of default constraints which corresponds to + * given schema (taretSchema) or target table's column schema (if targetSchema is null) + * @param tbl + * @param targetSchema + * @return List of default constraints (including NULL if there is no default) + * @throws SemanticException + */ + private List<String> getDefaultConstraints(Table tbl, List<String> targetSchema) throws SemanticException{ + Map<String, String> colNameToDefaultVal = null; + try { + DefaultConstraint dc = Hive.get().getEnabledDefaultConstraints(tbl.getDbName(), tbl.getTableName()); + colNameToDefaultVal = dc.getColNameToDefaultValueMap(); + } catch (Exception e) { + if (e instanceof SemanticException) { + throw (SemanticException) e; + } else { + throw (new RuntimeException(e)); + } + } + List<String> defaultConstraints = new ArrayList<>(); + if(targetSchema != null) { + for (String colName : targetSchema) { + defaultConstraints.add(colNameToDefaultVal.get(colName)); + } + } + else { + for(FieldSchema fs:tbl.getCols()) { + defaultConstraints.add(colNameToDefaultVal.get(fs.getName())); + } + } + return defaultConstraints; + } + + /** + * Constructs an AST for given DEFAULT string + * @param newValue + * @throws SemanticException + */ + private ASTNode getNodeReplacementforDefault(String newValue) throws SemanticException { + ASTNode newNode = null; + if(newValue== null) { + newNode = ASTBuilder.construct(HiveParser.TOK_NULL, "TOK_NULL").node(); + } + else { + try { + newNode = new ParseDriver().parseExpression(newValue); + } catch(Exception e) { + throw new SemanticException("Error while parsing default value for DEFAULT keyword: " + newValue + + ". Error message: " + e.getMessage()); + } + } + return newNode; + } + + /** + * This method replaces ASTNode corresponding to DEFAULT keyword with either DEFAULT constraint + * expression if exists or NULL otherwise + * @param selectExprs + * @param targetTable + * @throws SemanticException + */ + private void replaceDefaultKeywordForUpdate(ASTNode selectExprs, Table targetTable) throws SemanticException { + List<String> defaultConstraints = null; + for (int i = 0; i < selectExprs.getChildCount(); i++) { + ASTNode selectExpr = (ASTNode) selectExprs.getChild(i); + if (selectExpr.getChildCount() == 1 && selectExpr.getChild(0).getType() == HiveParser.TOK_TABLE_OR_COL) { + //first child should be rowid + if (i == 0 && !selectExpr.getChild(0).getChild(0).getText().equals("ROW__ID")) { + throw new SemanticException("Unexpected element when replacing default keyword for UPDATE." + + " Expected ROW_ID, found: " + selectExpr.getChild(0).getChild(0).getText()); + } + else if (selectExpr.getChild(0).getChild(0).getText().toLowerCase().equals("default")) { + if (defaultConstraints == null) { + defaultConstraints = getDefaultConstraints(targetTable, null); + } + ASTNode newNode = getNodeReplacementforDefault(defaultConstraints.get(i - 1)); + // replace the node in place + selectExpr.replaceChildren(0, 0, newNode); + if (LOG.isDebugEnabled()) { + LOG.debug("DEFAULT keyword replacement - Inserted " + newNode.getText() + " for table: " + targetTable.getTableName()); + } + } + } + } + } + + /** + * This method replaces DEFAULT AST node with DEFAULT expression + * @param valueArrClause This is AST for value clause + * @param targetTable + * @param targetSchema this is target schema/column schema if specified in query + * @throws SemanticException + */ + private void replaceDefaultKeyword(ASTNode valueArrClause, Table targetTable, List<String> targetSchema) throws SemanticException{ + List<String> defaultConstraints = null; + for(int i=1; i<valueArrClause.getChildCount(); i++) { + ASTNode valueClause = (ASTNode)valueArrClause.getChild(i); + //skip first child since it is struct + for(int j=1; j<valueClause.getChildCount(); j++) { + if(valueClause.getChild(j).getType() == HiveParser.TOK_TABLE_OR_COL + && valueClause.getChild(j).getChild(0).getText().toLowerCase().equals("default")) { + if(defaultConstraints == null) { + defaultConstraints = getDefaultConstraints(targetTable, targetSchema); + } + ASTNode newNode = getNodeReplacementforDefault(defaultConstraints.get(j-1)); + // replace the node in place + valueClause.replaceChildren(j, j, newNode); + if (LOG.isDebugEnabled()) { + LOG.debug("DEFAULT keyword replacement - Inserted " + newNode.getText() + " for table: " + targetTable.getTableName()); + } + } + } + } + } + private void doPhase1GetColumnAliasesFromSelect( - ASTNode selectExpr, QBParseInfo qbp) { + ASTNode selectExpr, QBParseInfo qbp, String dest) throws SemanticException { + if (isInsertInto(qbp, dest)) { + ASTNode tblAst = qbp.getDestForClause(dest); + String tableName = getUnescapedName((ASTNode) tblAst.getChild(0)); + Table targetTable = null; + try { + if (isValueClause(selectExpr)) { + targetTable = db.getTable(tableName, false); + replaceDefaultKeyword((ASTNode) selectExpr.getChild(0).getChild(0).getChild(1), targetTable, qbp.getDestSchemaForClause(dest)); + } else if (updating(dest)) { + targetTable = db.getTable(tableName, false); + replaceDefaultKeywordForUpdate(selectExpr, targetTable); + } + } catch (Exception e) { + if (e instanceof SemanticException) { + throw (SemanticException) e; + } else { + throw (new RuntimeException(e)); + } + } + } for (int i = 0; i < selectExpr.getChildCount(); ++i) { ASTNode selExpr = (ASTNode) selectExpr.getChild(i); if ((selExpr.getToken().getType() == HiveParser.TOK_SELEXPR) @@ -1352,7 +1535,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast, qb, ctx_1.dest); - doPhase1GetColumnAliasesFromSelect(ast, qbp); + doPhase1GetColumnAliasesFromSelect(ast, qbp, ctx_1.dest); qbp.setAggregationExprsForClause(ctx_1.dest, aggregations); qbp.setDistinctFuncExprsForClause(ctx_1.dest, doPhase1GetDistinctFuncExprs(aggregations)); @@ -12003,7 +12186,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { fetchTask = pCtx.getFetchTask(); } //find all Acid FileSinkOperatorS - QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId()); + QueryPlanPostProcessor qp = new QueryPlanPostProcessor((List<Task<?>>)rootTasks, acidFileSinks, ctx.getExecutionId()); LOG.info("Completed plan generation"); // 10. put accessed columns to readEntity http://git-wip-us.apache.org/repos/asf/hive/blob/9e98d59d/ql/src/test/queries/clientpositive/insert_into_default_keyword.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/insert_into_default_keyword.q b/ql/src/test/queries/clientpositive/insert_into_default_keyword.q new file mode 100644 index 0000000..14f91fe --- /dev/null +++ b/ql/src/test/queries/clientpositive/insert_into_default_keyword.q @@ -0,0 +1,116 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +-- SORT_QUERY_RESULTS + +DROP TABLE insert_into1; + +-- No default constraint +CREATE TABLE insert_into1 (key int, value string) + clustered by (key) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); + +EXPLAIN INSERT INTO TABLE insert_into1 values(default, DEFAULT); +INSERT INTO TABLE insert_into1 values(default, DEFAULT); +SELECT * from insert_into1; +TRUNCATE table insert_into1; + +-- should be able to use any case for DEFAULT +EXPLAIN INSERT INTO TABLE insert_into1 values(234, dEfAULt); +INSERT INTO TABLE insert_into1 values(234, dEfAULt); +SELECT * from insert_into1; +TRUNCATE table insert_into1; + +-- multi values +explain insert into insert_into1 values(default, 3),(2,default); +insert into insert_into1 values(default, 3),(2,default); +select * from insert_into1; +TRUNCATE table insert_into1; + +--with column schema +EXPLAIN INSERT INTO TABLE insert_into1(key) values(default); +INSERT INTO TABLE insert_into1(key) values(default); +select * from insert_into1; +TRUNCATE table insert_into1; + +EXPLAIN INSERT INTO TABLE insert_into1(key, value) values(2,default); +INSERT INTO TABLE insert_into1(key, value) values(2,default); +select * from insert_into1; +TRUNCATE table insert_into1; + +DROP TABLE insert_into1; + +-- with default constraint +CREATE TABLE insert_into1 (key int DEFAULT 1, value string) + clustered by (key) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); +EXPLAIN INSERT INTO TABLE insert_into1 values(default, DEFAULT); +INSERT INTO TABLE insert_into1 values(default, DEFAULT); +SELECT * from insert_into1; +TRUNCATE table insert_into1; + +-- should be able to use any case for DEFAULT +EXPLAIN INSERT INTO TABLE insert_into1 values(234, dEfAULt); +INSERT INTO TABLE insert_into1 values(234, dEfAULt); +SELECT * from insert_into1; +TRUNCATE table insert_into1; + +-- multi values +explain insert into insert_into1 values(default, 3),(2,default); +insert into insert_into1 values(default, 3),(2,default); +select * from insert_into1; +TRUNCATE table insert_into1; + +--with column schema +EXPLAIN INSERT INTO TABLE insert_into1(key) values(default); +INSERT INTO TABLE insert_into1(key) values(default); +select * from insert_into1; +TRUNCATE table insert_into1; + +EXPLAIN INSERT INTO TABLE insert_into1(key, value) values(2,default); +INSERT INTO TABLE insert_into1(key, value) values(2,default); +select * from insert_into1; +TRUNCATE table insert_into1; + +EXPLAIN INSERT INTO TABLE insert_into1(value, key) values(2,default); +INSERT INTO TABLE insert_into1(value, key) values(2,default); +select * from insert_into1; +TRUNCATE table insert_into1; + +EXPLAIN INSERT INTO TABLE insert_into1(key, value) values(2,default),(DEFAULT, default); +INSERT INTO TABLE insert_into1(key, value) values(2,default),(DEFAULT, default); +select * from insert_into1; +TRUNCATE table insert_into1; +DROP TABLE insert_into1; + + +-- UPDATE +CREATE TABLE insert_into1 (key int DEFAULT 1, value string, i int) + clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); + +INSERT INTO insert_into1 values(2,1, 45); +EXPLAIN UPDATE insert_into1 set key = DEFAULT where value=1; +UPDATE insert_into1 set key = DEFAULT where value=1; +SELECT * from insert_into1; +TRUNCATE table insert_into1; + +INSERT INTO insert_into1 values(2,1, 45); +EXPLAIN UPDATE insert_into1 set key = DEFAULT, value=DEFAULT where value=1; +UPDATE insert_into1 set key = DEFAULT, value=DEFAULT where value=1; +SELECT * from insert_into1; +TRUNCATE table insert_into1; + +DROP TABLE insert_into1; + +-- partitioned table +CREATE TABLE tpart(i int, j int DEFAULT 1001) partitioned by (ds string); +-- no column schema +EXPLAIN INSERT INTO tpart partition(ds='1') values(DEFAULT, DEFAULT); +INSERT INTO tpart partition(ds='1') values(DEFAULT, DEFAULT); +SELECT * FROM tpart; +TRUNCATE table tpart; +-- with column schema +EXPLAIN INSERT INTO tpart partition(ds='1')(i) values(DEFAULT); +INSERT INTO tpart partition(ds='1')(i) values(DEFAULT); +EXPLAIN INSERT INTO tpart partition(ds='1')(i,j) values(10, DEFAULT); +INSERT INTO tpart partition(ds='1')(i,j) values(10, DEFAULT); +SELECT * FROM tpart; +TRUNCATE table tpart; +DROP TABLE tpart;
