IMPALA-5354: INSERT hints for Kudu tables A previous change, IMPALA-3742, added an exchange node and sort node to plans for inserts into Kudu tables to partition and sort the input to match the target table.
This patch enables INSERT hints for Kudu tables - 'noshuffle' which removes the exchange node from the plan and 'noclustered' which removes the sort node. Insert hints have no effect for inserts that are small enough to result in a single node execution. Testing: - Updated FE planner and analysis tests. - Ran Kudu EE tests. Change-Id: Idbd1ef977446ffee157ce3ce0b476e1f08a75d05 Reviewed-on: http://gerrit.cloudera.org:8080/6980 Reviewed-by: Matthew Jacobs <[email protected]> Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/014c5603 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/014c5603 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/014c5603 Branch: refs/heads/master Commit: 014c5603f867907963f3821948f90d526e9a4789 Parents: bbca087 Author: Thomas Tauber-Marshall <[email protected]> Authored: Wed May 24 13:38:33 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu May 25 21:08:59 2017 +0000 ---------------------------------------------------------------------- .../org/apache/impala/analysis/InsertStmt.java | 7 +--- .../impala/planner/DistributedPlanner.java | 14 ++----- .../java/org/apache/impala/planner/Planner.java | 12 ++---- .../java/org/apache/impala/util/KuduUtil.java | 19 ++++++++++ .../impala/analysis/AnalyzeStmtsTest.java | 12 +++--- .../impala/analysis/AnalyzeUpsertStmtTest.java | 6 +-- .../queries/PlannerTest/kudu-upsert.test | 40 ++++++++++++++++++++ .../queries/PlannerTest/kudu.test | 29 ++++++++++++++ 8 files changed, 105 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index c25f484..c23145d 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -797,12 +797,9 @@ public class InsertStmt extends StatementBase { private void analyzePlanHints(Analyzer analyzer) throws AnalysisException { if (planHints_.isEmpty()) return; - if (isUpsert_) { - throw new AnalysisException("Hints not supported in UPSERT statements."); - } - if (table_ instanceof HBaseTable || table_ instanceof KuduTable) { + if (table_ instanceof HBaseTable) { throw new AnalysisException(String.format("INSERT hints are only supported for " + - "inserting into Hdfs tables: %s", getTargetTableName())); + "inserting into Hdfs and Kudu tables: %s", getTargetTableName())); } for (PlanHint hint: planHints_) { if (hint.is("SHUFFLE")) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index ac0355e..21423c5 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -23,17 +23,16 @@ import java.util.List; import org.apache.impala.analysis.AggregateInfo; import org.apache.impala.analysis.AnalysisContext; import org.apache.impala.analysis.Analyzer; -import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.InsertStmt; import org.apache.impala.analysis.JoinOperator; -import org.apache.impala.analysis.KuduPartitionExpr; import org.apache.impala.analysis.QueryStmt; import org.apache.impala.catalog.KuduTable; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; import org.apache.impala.planner.JoinNode.DistributionMode; import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter; +import org.apache.impala.util.KuduUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,15 +245,8 @@ public class DistributedPlanner { if (partitionExprs.isEmpty()) { partition = DataPartition.UNPARTITIONED; } else if (insertStmt.getTargetTable() instanceof KuduTable) { - // IMPALA-5294: Don't use 'partitionExpr' here because the constants were removed. - // We need all of the partition exprs to fill in the partition column values for - // each row to send to Kudu to determine the partition number. - Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID, - (KuduTable) insertStmt.getTargetTable(), - Lists.newArrayList(insertStmt.getPartitionKeyExprs()), - insertStmt.getPartitionColPos()); - kuduPartitionExpr.analyze(ctx_.getRootAnalyzer()); - partition = DataPartition.kuduPartitioned(kuduPartitionExpr); + partition = DataPartition.kuduPartitioned( + KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer())); } else { partition = DataPartition.hashPartitioned(partitionExprs); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index b6fa54a..31a061b 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -24,12 +24,10 @@ import java.util.List; import org.apache.impala.analysis.AnalysisContext; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.ColumnLineageGraph; -import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.ExprSubstitutionMap; import org.apache.impala.analysis.InsertStmt; import org.apache.impala.analysis.JoinOperator; -import org.apache.impala.analysis.KuduPartitionExpr; import org.apache.impala.analysis.QueryStmt; import org.apache.impala.analysis.SortInfo; import org.apache.impala.catalog.HBaseTable; @@ -46,6 +44,7 @@ import org.apache.impala.thrift.TQueryExecRequest; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TRuntimeFilterMode; import org.apache.impala.thrift.TTableName; +import org.apache.impala.util.KuduUtil; import org.apache.impala.util.MaxRowsProcessedVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -499,12 +498,9 @@ public class Planner { List<Expr> orderingExprs = Lists.newArrayList(); if (insertStmt.getTargetTable() instanceof KuduTable) { - if (inputFragment.getDataPartition().getType() == TPartitionType.KUDU) { - Preconditions.checkState( - inputFragment.getDataPartition().getPartitionExprs().size() == 1); - // Only sort for Kudu if we've already partitioned so that we can sort the - // partitions separately. This will be true if this is a distributed exec. - orderingExprs.add(inputFragment.getDataPartition().getPartitionExprs().get(0)); + if (!insertStmt.hasNoClusteredHint() && !ctx_.isSingleNodeExec()) { + orderingExprs.add( + KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer())); orderingExprs.addAll(insertStmt.getPrimaryKeyExprs()); } } else if (insertStmt.hasClusteredHint() || !insertStmt.getSortExprs().isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/main/java/org/apache/impala/util/KuduUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index 8b61c88..645866b 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -23,10 +23,14 @@ import java.util.HashSet; import java.util.List; import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.FunctionCallExpr; +import org.apache.impala.analysis.InsertStmt; +import org.apache.impala.analysis.KuduPartitionExpr; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.analysis.NumericLiteral; +import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; @@ -399,4 +403,19 @@ public class KuduUtil { "Kudu type '%s' is not supported in Impala", t.getName())); } } + + /** + * Creates and returns an Expr that takes rows being inserted by 'insertStmt' and + * returns the partition number for each row. + */ + public static Expr createPartitionExpr(InsertStmt insertStmt, Analyzer analyzer) + throws AnalysisException { + Preconditions.checkState(insertStmt.getTargetTable() instanceof KuduTable); + Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID, + (KuduTable) insertStmt.getTargetTable(), + Lists.newArrayList(insertStmt.getPartitionKeyExprs()), + insertStmt.getPartitionColPos()); + kuduPartitionExpr.analyze(analyzer); + return kuduPartitionExpr; + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index 06ad842..cff7563 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -1782,18 +1782,16 @@ public class AnalyzeStmtsTest extends AnalyzerTest { AnalyzesOk(String.format( "insert into table functional.alltypesnopart %sshuffle%s " + "select * from functional.alltypesnopart", prefix, suffix)); + // Insert hints are ok for Kudu tables. + AnalyzesOk(String.format( + "insert into table functional_kudu.alltypes %sshuffle%s " + + "select * from functional_kudu.alltypes", prefix, suffix)); // Plan hints do not make sense for inserting into HBase tables. AnalysisError(String.format( "insert into table functional_hbase.alltypes %sshuffle%s " + "select * from functional_hbase.alltypes", prefix, suffix), - "INSERT hints are only supported for inserting into Hdfs tables: " + + "INSERT hints are only supported for inserting into Hdfs and Kudu tables: " + "functional_hbase.alltypes"); - // Plan hints do not make sense for inserting into Kudu tables. - AnalysisError(String.format( - "insert into table functional_kudu.alltypes %sshuffle%s " + - "select * from functional_kudu.alltypes", prefix, suffix), - "INSERT hints are only supported for inserting into Hdfs tables: " + - "functional_kudu.alltypes"); // Conflicting plan hints. AnalysisError("insert into table functional.alltypessmall " + "partition (year, month) /* +shuffle,noshuffle */ " + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java index 2a20c2a..d6cd804 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java @@ -60,9 +60,9 @@ public class AnalyzeUpsertStmtTest extends AnalyzerTest { "from functional.alltypes a, functional.allcomplextypes b, " + "(select item from b.int_array_col) v1 " + "where a.id = b.id"); - // Hint, not supported for Kudu tables. - AnalysisError("upsert into table functional_kudu.testtbl [clustered] select * from " + - "functional_kudu.testtbl", "Hints not supported in UPSERT statements."); + // Hint + AnalyzesOk("upsert into table functional_kudu.testtbl [clustered] select * from " + + "functional_kudu.testtbl"); // Key columns missing from permutation AnalysisError("upsert into functional_kudu.testtbl(zip) values(1)", http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test index e04278d..2bc5df7 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test @@ -190,3 +190,43 @@ UPSERT INTO KUDU [functional_kudu.alltypes] | 00:SCAN KUDU [functional_kudu.alltypes] ==== +# Hint - noshuffle should remove the exchange node. +upsert into functional_kudu.alltypes /* +noshuffle */ select * from functional.alltypes; +---- DISTRIBUTEDPLAN +UPSERT INTO KUDU [functional_kudu.alltypes] +| +01:SORT +| order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Hint - noclustered should remove the sort node. +upsert into functional_kudu.alltypes /* +noclustered */ select * from functional.alltypes; +---- DISTRIBUTEDPLAN +UPSERT INTO KUDU [functional_kudu.alltypes] +| +01:EXCHANGE [KUDU(KuduPartition(functional.alltypes.id))] +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Hint - noshuffle should remove the exchange node. +upsert into functional_kudu.alltypes /* +noshuffle */ select * from functional.alltypes; +---- DISTRIBUTEDPLAN +UPSERT INTO KUDU [functional_kudu.alltypes] +| +01:SORT +| order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +upsert into functional_kudu.alltypes /* +noclustered,noshuffle */ +select * from functional.alltypes; +---- DISTRIBUTEDPLAN +UPSERT INTO KUDU [functional_kudu.alltypes] +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index 12ab9fc..16cb3a9 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -389,3 +389,32 @@ PLAN-ROOT SINK | 01:SCAN KUDU [functional_kudu.alltypes] ==== +# Hint - noshuffle should remove the exchange node. +insert into functional_kudu.alltypes /* +noshuffle */ select * from functional.alltypes; +---- DISTRIBUTEDPLAN +INSERT INTO KUDU [functional_kudu.alltypes] +| +01:SORT +| order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Hint - noclustered should remove the sort node. +insert into functional_kudu.alltypes /* +noclustered */ select * from functional.alltypes; +---- DISTRIBUTEDPLAN +INSERT INTO KUDU [functional_kudu.alltypes] +| +01:EXCHANGE [KUDU(KuduPartition(functional.alltypes.id))] +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +insert into functional_kudu.alltypes /* +noclustered,noshuffle */ +select * from functional.alltypes; +---- DISTRIBUTEDPLAN +INSERT INTO KUDU [functional_kudu.alltypes] +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +====
