Repository: incubator-impala Updated Branches: refs/heads/master ecd78fb67 -> 5adedc6a1
IMPALA-3930,IMPALA-2570: Fix shuffle insert hint with constant partition exprs. Fixes inserts into partitioned tables that have a shuffle hint and only constant partition exprs. The rows to be inserted are merged at the coordinator where the table sink is executed. There is no need to hash exchange rows. Now accepts insert hints when inserting into unpartitioned tables. The shuffle hint leads to a plan where all rows are merged at the coordinator where the table sink is executed. Change-Id: I1084d49c95b7d867eeac3297fd2016daff0ab687 Reviewed-on: http://gerrit.cloudera.org:8080/4162 Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5adedc6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5adedc6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5adedc6a Branch: refs/heads/master Commit: 5adedc6a1a623b4c7b81f4ba6d5019d74dbd05f2 Parents: ecd78fb Author: Alex Behm <[email protected]> Authored: Mon Aug 29 13:39:32 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Wed Aug 31 09:59:00 2016 +0000 ---------------------------------------------------------------------- .../cloudera/impala/analysis/InsertStmt.java | 32 ++++--- .../impala/planner/DistributedPlanner.java | 87 ++++++++++---------- .../impala/analysis/AnalyzeStmtsTest.java | 34 +------- .../queries/PlannerTest/insert.test | 58 +++++++++++++ 4 files changed, 126 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5adedc6a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java index 5749424..c5965db 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java @@ -107,10 +107,12 @@ public class InsertStmt extends StatementBase { // Set in analyze(). Exprs corresponding to the partitionKeyValues, private List<Expr> partitionKeyExprs_ = Lists.newArrayList(); - // True to force re-partitioning before the table sink, false to prevent it. Set in - // analyze() based on planHints_. Null if no explicit hint was given (the planner - // should decide whether to re-partition or not). - private Boolean isRepartition_ = null; + // Indicates whether this insert stmt has a shuffle or noshuffle plan hint. + // Both flags may be false. Only one of them may be true, not both. + // Shuffle forces data repartitioning before then data sink, and noshuffle + // prevents it. Set in analyze() based on planHints_. + private boolean hasShuffleHint_ = false; + private boolean hasNoShuffleHint_ = false; // Output expressions that produce the final results to write to the target table. May // include casts, and NullLiterals where an output column isn't explicitly mentioned. @@ -166,7 +168,8 @@ public class InsertStmt extends StatementBase { queryStmt_.reset(); table_ = null; partitionKeyExprs_.clear(); - isRepartition_ = null; + hasShuffleHint_ = false; + hasNoShuffleHint_ = false; resultExprs_.clear(); } @@ -598,28 +601,30 @@ public class InsertStmt extends StatementBase { private void analyzePlanHints(Analyzer analyzer) throws AnalysisException { if (planHints_ == null) return; - if (!planHints_.isEmpty() && - (partitionKeyValues_ == null || table_ instanceof HBaseTable)) { + if (!planHints_.isEmpty() && table_ instanceof HBaseTable) { throw new AnalysisException("INSERT hints are only supported for inserting into " + - "partitioned Hdfs tables."); + "Hdfs tables."); } for (String hint: planHints_) { if (hint.equalsIgnoreCase("SHUFFLE")) { - if (isRepartition_ != null && !isRepartition_) { + if (hasNoShuffleHint_) { throw new AnalysisException("Conflicting INSERT hint: " + hint); } - isRepartition_ = Boolean.TRUE; + hasShuffleHint_ = true; analyzer.setHasPlanHints(); } else if (hint.equalsIgnoreCase("NOSHUFFLE")) { - if (isRepartition_ != null && isRepartition_) { + if (hasShuffleHint_) { throw new AnalysisException("Conflicting INSERT hint: " + hint); } - isRepartition_ = Boolean.FALSE; + hasNoShuffleHint_ = true; analyzer.setHasPlanHints(); } else { analyzer.addWarning("INSERT hint not recognized: " + hint); } } + // Both flags may be false or one of them may be true, but not both. + Preconditions.checkState((!hasShuffleHint_ && !hasNoShuffleHint_) + || (hasShuffleHint_ ^ hasNoShuffleHint_)); } public List<String> getPlanHints() { return planHints_; } @@ -634,7 +639,8 @@ public class InsertStmt extends StatementBase { public QueryStmt getQueryStmt() { return queryStmt_; } public void setQueryStmt(QueryStmt stmt) { queryStmt_ = stmt; } public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; } - public Boolean isRepartition() { return isRepartition_; } + public boolean hasShuffleHint() { return hasShuffleHint_; } + public boolean hasNoShuffleHint() { return hasNoShuffleHint_; } public ArrayList<Expr> getResultExprs() { return resultExprs_; } public DataSink createDataSink() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5adedc6a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java index 2c3124b..b38b018 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java @@ -179,64 +179,67 @@ public class DistributedPlanner { } /** - * Makes a cost-based decision on whether to repartition the output of 'inputFragment' - * before feeding its data into the table sink of the given 'insertStmt'. Considers - * user-supplied plan hints to determine whether to repartition or not. - * Returns a plan fragment that partitions the output of 'inputFragment' on the - * partition exprs of 'insertStmt', unless the expected number of partitions is less - * than the number of nodes on which inputFragment runs. - * If it ends up creating a new fragment, appends that to 'fragments'. + * Decides whether to repartition the output of 'inputFragment' before feeding its + * data into the table sink of the given 'insertStmt'. The decision obeys the + * shuffle/noshuffle plan hints if present. Otherwise, returns a plan fragment that + * partitions the output of 'inputFragment' on the partition exprs of 'insertStmt', + * unless the expected number of partitions is less than the number of nodes on which + * inputFragment runs, or the target table is unpartitioned. + * For inserts into unpartitioned tables or inserts with only constant partition exprs, + * the shuffle hint leads to a plan that merges all rows at the coordinator where + * the table sink is executed. + * If this functions ends up creating a new fragment, appends that to 'fragments'. */ public PlanFragment createInsertFragment( PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer, ArrayList<PlanFragment> fragments) throws ImpalaException { - List<Expr> partitionExprs = insertStmt.getPartitionKeyExprs(); - Boolean partitionHint = insertStmt.isRepartition(); - if (partitionExprs.isEmpty()) return inputFragment; - if (partitionHint != null && !partitionHint) return inputFragment; - - // we ignore constants for the sake of partitioning - List<Expr> nonConstPartitionExprs = Lists.newArrayList(partitionExprs); - Expr.removeConstants(nonConstPartitionExprs); - DataPartition inputPartition = inputFragment.getDataPartition(); + if (insertStmt.hasNoShuffleHint()) return inputFragment; - // do nothing if the input fragment is already appropriately partitioned - if (analyzer.equivSets(inputPartition.getPartitionExprs(), - nonConstPartitionExprs)) { - return inputFragment; - } + List<Expr> partitionExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs()); + // Ignore constants for the sake of partitioning. + Expr.removeConstants(partitionExprs); - // if the existing partition exprs are a subset of the table partition exprs, check - // if it is distributed across all nodes; if so, don't repartition - if (Expr.isSubset(inputPartition.getPartitionExprs(), nonConstPartitionExprs)) { - long numPartitions = getNumDistinctValues(inputPartition.getPartitionExprs()); - if (numPartitions >= inputFragment.getNumNodes()) return inputFragment; + // Do nothing if the input fragment is already appropriately partitioned. + DataPartition inputPartition = inputFragment.getDataPartition(); + if (!partitionExprs.isEmpty() && + analyzer.equivSets(inputPartition.getPartitionExprs(), partitionExprs)) { + return inputFragment; } - // don't repartition if the resulting number of partitions is too low to get good - // parallelism - long numPartitions = getNumDistinctValues(nonConstPartitionExprs); + // Make a cost-based decision only if no user hint was supplied. + if (!insertStmt.hasShuffleHint()) { + // If the existing partition exprs are a subset of the table partition exprs, check + // if it is distributed across all nodes. If so, don't repartition. + if (Expr.isSubset(inputPartition.getPartitionExprs(), partitionExprs)) { + long numPartitions = getNumDistinctValues(inputPartition.getPartitionExprs()); + if (numPartitions >= inputFragment.getNumNodes()) return inputFragment; + } - // don't repartition if we know we have fewer partitions than nodes - // (ie, default to repartitioning if col stats are missing) - // TODO: we want to repartition if the resulting files would otherwise - // be very small (less than some reasonable multiple of the recommended block size); - // in order to do that, we need to come up with an estimate of the avg row size - // in the particular file format of the output table/partition. - // We should always know on how many nodes our input is running. - Preconditions.checkState(inputFragment.getNumNodes() != -1); - if (partitionHint == null && numPartitions > 0 && - numPartitions <= inputFragment.getNumNodes()) { - return inputFragment; + // Don't repartition if we know we have fewer partitions than nodes + // (ie, default to repartitioning if col stats are missing). + // TODO: We want to repartition if the resulting files would otherwise + // be very small (less than some reasonable multiple of the recommended block size). + // In order to do that, we need to come up with an estimate of the avg row size + // in the particular file format of the output table/partition. + // We should always know on how many nodes our input is running. + long numPartitions = getNumDistinctValues(partitionExprs); + Preconditions.checkState(inputFragment.getNumNodes() != -1); + if (numPartitions > 0 && numPartitions <= inputFragment.getNumNodes()) { + return inputFragment; + } } - Preconditions.checkState(partitionHint == null || partitionHint); ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot()); exchNode.init(analyzer); Preconditions.checkState(exchNode.hasValidStats()); - DataPartition partition = DataPartition.hashPartitioned(nonConstPartitionExprs); + DataPartition partition; + if (partitionExprs.isEmpty()) { + partition = DataPartition.UNPARTITIONED; + } else { + partition = DataPartition.hashPartitioned(partitionExprs); + } PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), exchNode, partition); inputFragment.setDestination(exchNode); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5adedc6a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java index 05a0855..aeef89a 100644 --- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java @@ -1734,16 +1734,15 @@ public class AnalyzeStmtsTest extends AnalyzerTest { "partition (year, month) %sbadhint%s select * from functional.alltypes", prefix, suffix), "INSERT hint not recognized: badhint"); - // Plan hints require a partition clause. - AnalysisError(String.format( + // Insert hints are ok for unpartitioned tables. + AnalyzesOk(String.format( "insert into table functional.alltypesnopart %sshuffle%s " + - "select * from functional.alltypesnopart", prefix, suffix), - "INSERT hints are only supported for inserting into partitioned Hdfs tables."); + "select * from functional.alltypesnopart", prefix, suffix)); // Plan hints do not make sense for inserting into HBase tables. AnalysisError(String.format( "insert into table functional_hbase.alltypes %sshuffle%s " + "select * from functional_hbase.alltypes", prefix, suffix), - "INSERT hints are only supported for inserting into partitioned Hdfs tables."); + "INSERT hints are only supported for inserting into Hdfs tables."); // Conflicting plan hints. AnalysisError("insert into table functional.alltypessmall " + "partition (year, month) /* +shuffle,noshuffle */ " + @@ -2979,31 +2978,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest { "'b.int_array_col' correlated with an outer block as well as an " + "uncorrelated one 'functional.alltypestiny':\n" + "SELECT item FROM b.int_array_col, functional.alltypestiny"); - - // Test plan hints for partitioned Hdfs tables. - AnalyzesOk("insert into functional.alltypessmall " + - "partition (year, month) [shuffle] select * from functional.alltypes"); - AnalyzesOk("insert into table functional.alltypessmall " + - "partition (year, month) [noshuffle] select * from functional.alltypes"); - // Multiple non-conflicting hints and case insensitivity of hints. - AnalyzesOk("insert into table functional.alltypessmall " + - "partition (year, month) [shuffle, ShUfFlE] select * from functional.alltypes"); - // Unknown plan hint. Expect a warning but no error. - AnalyzesOk("insert into functional.alltypessmall " + - "partition (year, month) [badhint] select * from functional.alltypes", - "INSERT hint not recognized: badhint"); - // Conflicting plan hints. - AnalysisError("insert into table functional.alltypessmall " + - "partition (year, month) [shuffle, noshuffle] select * from functional.alltypes", - "Conflicting INSERT hint: noshuffle"); - // Plan hints require a partition clause. - AnalysisError("insert into table functional.alltypesnopart [shuffle] " + - "select * from functional.alltypesnopart", - "INSERT hints are only supported for inserting into partitioned Hdfs tables."); - // Plan hints do not make sense for inserting into HBase tables. - AnalysisError("insert into table functional_hbase.alltypes [shuffle] " + - "select * from functional_hbase.alltypes", - "INSERT hints are only supported for inserting into partitioned Hdfs tables."); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5adedc6a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test index 37dfd21..b608dea 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test @@ -512,3 +512,61 @@ WRITE TO HDFS [functional.alltypestiny, OVERWRITE=false, PARTITION-KEYS=(2009,1) 00:SCAN HDFS [functional.alltypestiny] partitions=4/4 files=4 size=460B ==== +# IMPALA-3930: Test insert with shuffle hint on constant partition exprs. The table sink +# is executed at the coordinator. +insert into table functional.alltypes partition(year=2009, month=1) /* +shuffle */ +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, +float_col, double_col, date_string_col, string_col, timestamp_col +from functional.alltypes +---- DISTRIBUTEDPLAN +WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2009,1)] +| partitions=1 +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# IMPALA-3930: Same as above but with a dynamic partition insert. +insert into table functional.alltypes partition(year, month) /* +shuffle */ +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, +float_col, double_col, date_string_col, string_col, timestamp_col, 2009, 1 +from functional.alltypes +---- DISTRIBUTEDPLAN +WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2009,1)] +| partitions=1 +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# IMPALA-3930: Same as above but with a mix of static/dynamic partition exprs, and +# with more complex constant exprs. +insert into table functional.alltypes partition(year, month=cast(10/2 as int)) /* +shuffle */ +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, +float_col, double_col, date_string_col, string_col, timestamp_col, cast(concat("2", "010") as smallint) - 1 +from functional.alltypes +---- DISTRIBUTEDPLAN +WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(CAST(concat('2', '010') AS SMALLINT) - 1,CAST(10 / 2 AS INT))] +| partitions=1 +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Test insert into an unpartitioned table with shuffle hint. +insert into table functional.alltypesnopart /* +shuffle */ +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, +float_col, double_col, date_string_col, string_col, timestamp_col +from functional.alltypes +---- DISTRIBUTEDPLAN +WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false] +| partitions=1 +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +====
