IMPALA-5381: Adds DEFAULT_JOIN_DISTRIBUTION_MODE query option. Adds a new query option DEFAULT_JOIN_DISTRIBUTION_MODE to control which join distribution mode is chosen when the join inputs have an unknown cardinality (e.g., missing stats) or when the expected costs of the different strategies are equal.
Values for DEFAULT_JOIN_DISTRIBUTION_MODE: [BROADCAST, SHUFFLE] Default: BROADCAST Note that this change effectively undoes IMPALA-5120. Testing: - Added new planner tests - Core/hdfs run passed Change-Id: Ibd34442f422129d53bef5493fc9cbe7375a0765c Reviewed-on: http://gerrit.cloudera.org:8080/7059 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ecda49f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ecda49f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ecda49f3 Branch: refs/heads/master Commit: ecda49f3e3001e23bebd6bdfaa1c612716df4bf1 Parents: 5518cbc Author: Alex Behm <[email protected]> Authored: Thu Jun 1 18:39:43 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Sun Jun 4 08:11:53 2017 +0000 ---------------------------------------------------------------------- be/src/service/query-options.cc | 13 ++ be/src/service/query-options.h | 3 +- common/thrift/ImpalaInternalService.thrift | 10 ++ common/thrift/ImpalaService.thrift | 4 + .../impala/planner/DistributedPlanner.java | 129 +++++++++---------- .../org/apache/impala/planner/JoinNode.java | 6 + .../org/apache/impala/planner/PlannerTest.java | 11 ++ .../default-join-distr-mode-broadcast.test | 63 +++++++++ .../default-join-distr-mode-shuffle.test | 69 ++++++++++ .../queries/PlannerTest/joins.test | 22 ---- 10 files changed, 242 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 0f0d30c..f4c4f05 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -480,6 +480,19 @@ Status impala::SetQueryOption(const string& key, const string& value, iequals(value, "true") || iequals(value, "1")); break; } + case TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE: { + if (iequals(value, "BROADCAST") || iequals(value, "0")) { + query_options->__set_default_join_distribution_mode( + TJoinDistributionMode::BROADCAST); + } else if (iequals(value, "SHUFFLE") || iequals(value, "1")) { + query_options->__set_default_join_distribution_mode( + TJoinDistributionMode::SHUFFLE); + } else { + return Status(Substitute("Invalid default_join_distribution_mode '$0'. " + "Valid values are BROADCAST or SHUFFLE", value)); + } + break; + } default: // We hit this DCHECK(false) if we forgot to add the corresponding entry here // when we add a new query option. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 3a11383..1f5624c 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -35,7 +35,7 @@ class TQueryOptions; // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::PARQUET_READ_STATISTICS + 1);\ + TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -91,6 +91,7 @@ class TQueryOptions; QUERY_OPT_FN(parquet_dictionary_filtering, PARQUET_DICTIONARY_FILTERING)\ QUERY_OPT_FN(parquet_array_resolution, PARQUET_ARRAY_RESOLUTION)\ QUERY_OPT_FN(parquet_read_statistics, PARQUET_READ_STATISTICS)\ + QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE)\ ; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 17bfc4a..f622ed4 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -57,6 +57,11 @@ enum TParquetArrayResolution { TWO_LEVEL_THEN_THREE_LEVEL } +enum TJoinDistributionMode { + BROADCAST, + SHUFFLE +} + // Query options that correspond to ImpalaService.ImpalaQueryOptions, with their // respective defaults. Query options can be set in the following ways: // @@ -241,6 +246,11 @@ struct TQueryOptions { // processing. This includes skipping data based on the statistics and computing query // results like "select min()". 55: optional bool parquet_read_statistics = true + + // Join distribution mode that is used when the join inputs have an unknown + // cardinality, e.g., because of missing table statistics. + 56: optional TJoinDistributionMode default_join_distribution_mode = + TJoinDistributionMode.BROADCAST } // Impala currently has two types of sessions: Beeswax and HiveServer2 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index dd89e52..fb0016c 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -271,6 +271,10 @@ enum TImpalaQueryOptions { // processing. This includes skipping data based on the statistics and computing query // results like "select min()". PARQUET_READ_STATISTICS, + + // Join distribution mode that is used when the join inputs have an unknown + // cardinality, e.g., because of missing table statistics. + DEFAULT_JOIN_DISTRIBUTION_MODE } // The summary of a DML statement. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 21423c5..2266625 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -79,12 +79,7 @@ public class DistributedPlanner { Preconditions.checkState(!queryStmt.hasOffset()); isPartitioned = true; } - long perNodeMemLimit = ctx_.getQueryOptions().mem_limit; - if (LOG.isTraceEnabled()) { - LOG.trace("create plan fragments"); - LOG.trace("memlimit=" + Long.toString(perNodeMemLimit)); - } - createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments); + createPlanFragments(singleNodePlan, isPartitioned, fragments); return fragments; } @@ -98,8 +93,7 @@ public class DistributedPlanner { * partitioned; the partition function is derived from the inputs. */ private PlanFragment createPlanFragments( - PlanNode root, boolean isPartitioned, - long perNodeMemLimit, ArrayList<PlanFragment> fragments) + PlanNode root, boolean isPartitioned, ArrayList<PlanFragment> fragments) throws ImpalaException { ArrayList<PlanFragment> childFragments = Lists.newArrayList(); for (PlanNode child: root.getChildren()) { @@ -109,9 +103,7 @@ public class DistributedPlanner { boolean childIsPartitioned = !child.hasLimit(); // Do not fragment the subplan of a SubplanNode since it is executed locally. if (root instanceof SubplanNode && child == root.getChild(1)) continue; - childFragments.add( - createPlanFragments( - child, childIsPartitioned, perNodeMemLimit, fragments)); + childFragments.add(createPlanFragments(child, childIsPartitioned, fragments)); } PlanFragment result = null; @@ -120,14 +112,12 @@ public class DistributedPlanner { fragments.add(result); } else if (root instanceof HashJoinNode) { Preconditions.checkState(childFragments.size() == 2); - result = createHashJoinFragment( - (HashJoinNode) root, childFragments.get(1), childFragments.get(0), - perNodeMemLimit, fragments); + result = createHashJoinFragment((HashJoinNode) root, + childFragments.get(1), childFragments.get(0), fragments); } else if (root instanceof NestedLoopJoinNode) { Preconditions.checkState(childFragments.size() == 2); - result = createNestedLoopJoinFragment( - (NestedLoopJoinNode) root, childFragments.get(1), childFragments.get(0), - perNodeMemLimit, fragments); + result = createNestedLoopJoinFragment((NestedLoopJoinNode) root, + childFragments.get(1), childFragments.get(0), fragments); } else if (root instanceof SubplanNode) { Preconditions.checkState(childFragments.size() == 1); result = createSubplanNodeFragment((SubplanNode) root, childFragments.get(0)); @@ -304,8 +294,7 @@ public class DistributedPlanner { */ private PlanFragment createNestedLoopJoinFragment(NestedLoopJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, - long perNodeMemLimit, ArrayList<PlanFragment> fragments) - throws ImpalaException { + ArrayList<PlanFragment> fragments) throws ImpalaException { node.setDistributionMode(DistributionMode.BROADCAST); node.setChild(0, leftChildFragment.getPlanRoot()); connectChildFragment(node, 1, leftChildFragment, rightChildFragment); @@ -321,7 +310,7 @@ public class DistributedPlanner { PlanFragment leftChildFragment, PlanFragment rightChildFragment, List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs, ArrayList<PlanFragment> fragments) throws ImpalaException { - node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); + Preconditions.checkState(node.getDistributionMode() == DistributionMode.PARTITIONED); // The lhs and rhs input fragments are already partitioned on the join exprs. // Combine the lhs/rhs input fragments into leftChildFragment by placing the join // node into leftChildFragment and setting its lhs/rhs children to the plan root of @@ -415,20 +404,14 @@ public class DistributedPlanner { } /** - * Creates either a broadcast join or a repartitioning join, depending on the - * expected cost. - * If any of the inputs to the cost computation is unknown, it assumes the cost - * will be 0. Costs being equal, it'll favor partitioned over broadcast joins. - * If perNodeMemLimit > 0 and the size of the hash table for a broadcast join is - * expected to exceed that mem limit, switches to partitioned join instead. - * TODO: revisit the choice of broadcast as the default + * Creates either a broadcast join or a repartitioning join depending on the expected + * cost and various constraints. See computeDistributionMode() for more details. * TODO: don't create a broadcast join if we already anticipate that this will * exceed the query's memory budget. */ private PlanFragment createHashJoinFragment( HashJoinNode node, PlanFragment rightChildFragment, - PlanFragment leftChildFragment, long perNodeMemLimit, - ArrayList<PlanFragment> fragments) + PlanFragment leftChildFragment, ArrayList<PlanFragment> fragments) throws ImpalaException { // For both join types, the total cost is calculated as the amount of data // sent over the network, plus the amount of data inserted into the hash table. @@ -436,8 +419,8 @@ public class DistributedPlanner { // the leftChildFragment, and build a hash table with it on each node. Analyzer analyzer = ctx_.getRootAnalyzer(); PlanNode rhsTree = rightChildFragment.getPlanRoot(); - long rhsDataSize = 0; - long broadcastCost = Long.MAX_VALUE; + long rhsDataSize = -1; + long broadcastCost = -1; if (rhsTree.getCardinality() != -1) { rhsDataSize = Math.round( rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree)); @@ -455,8 +438,6 @@ public class DistributedPlanner { // repartition: both left- and rightChildFragment are partitioned on the // join exprs, and a hash table is built with the rightChildFragment's output. PlanNode lhsTree = leftChildFragment.getPlanRoot(); - // Subtract 1 here so that if stats are missing we default to partitioned. - long partitionCost = Long.MAX_VALUE - 1; List<Expr> lhsJoinExprs = Lists.newArrayList(); List<Expr> rhsJoinExprs = Lists.newArrayList(); for (Expr joinConjunct: node.getEqJoinConjuncts()) { @@ -466,12 +447,14 @@ public class DistributedPlanner { } boolean lhsHasCompatPartition = false; boolean rhsHasCompatPartition = false; + long partitionCost = -1; if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) { lhsHasCompatPartition = analyzer.equivSets(lhsJoinExprs, leftChildFragment.getDataPartition().getPartitionExprs()); rhsHasCompatPartition = analyzer.equivSets(rhsJoinExprs, rightChildFragment.getDataPartition().getPartitionExprs()); + Preconditions.checkState(rhsDataSize != -1); double lhsNetworkCost = (lhsHasCompatPartition) ? 0.0 : Math.round( lhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(lhsTree)); @@ -487,39 +470,12 @@ public class DistributedPlanner { LOG.trace(rhsTree.getExplainString(ctx_.getQueryOptions())); } - boolean doBroadcast = false; - // we do a broadcast join if - // - we're explicitly told to do so - // - or if it's cheaper and we weren't explicitly told to do a partitioned join - // - and we're not doing a full outer or right outer/semi join (those require the - // left-hand side to be partitioned for correctness) - // - and the expected size of the hash tbl doesn't exceed perNodeMemLimit - // - or we are doing a null-aware left anti join (broadcast is required for - // correctness) - // we do a "<=" comparison of the costs so that we default to broadcast joins if - // we're unable to estimate the cost - if ((node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN - && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN - && node.getJoinOp() != JoinOperator.RIGHT_SEMI_JOIN - && node.getJoinOp() != JoinOperator.RIGHT_ANTI_JOIN - // a broadcast join hint overides the check to see if the hash table - // size is less than the pernode memlimit - && (node.getDistributionModeHint() == DistributionMode.BROADCAST - || perNodeMemLimit == 0 - || Math.round(rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) - <= perNodeMemLimit) - // a broadcast join hint overrides the check to see if performing a broadcast - // join is more costly than a partitioned join - && (node.getDistributionModeHint() == DistributionMode.BROADCAST - || (node.getDistributionModeHint() != DistributionMode.PARTITIONED - && broadcastCost <= partitionCost))) - || node.getJoinOp().isNullAwareLeftAntiJoin()) { - doBroadcast = true; - } + DistributionMode distrMode = computeJoinDistributionMode( + node, broadcastCost, partitionCost, rhsDataSize); + node.setDistributionMode(distrMode); PlanFragment hjFragment = null; - if (doBroadcast) { - node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST); + if (distrMode == DistributionMode.BROADCAST) { // Doesn't create a new fragment, but modifies leftChildFragment to execute // the join; the build input is provided by an ExchangeNode, which is the // destination of the rightChildFragment's output @@ -534,7 +490,7 @@ public class DistributedPlanner { } for (RuntimeFilter filter: node.getRuntimeFilters()) { - filter.setIsBroadcast(doBroadcast); + filter.setIsBroadcast(distrMode == DistributionMode.BROADCAST); filter.computeHasLocalTargets(); // Work around IMPALA-3450, where cardinalities might be wrong in single-node plans // with UNION and LIMITs. @@ -544,6 +500,49 @@ public class DistributedPlanner { return hjFragment; } + /** + * Determines and returns the distribution mode for the given join based on the expected + * costs and the right-hand size data size. Considers the following: + * - Some join types require a specific distribution strategy to run correctly. + * - Checks for join hints. + * - Uses the default join strategy (query option) when the costs are unknown or tied. + * - Returns broadcast if it is cheaper than partitioned and the expected hash table + * size is within the query mem limit. + * - Otherwise, returns partitioned. + * For 'broadcastCost', 'partitionCost', and 'rhsDataSize' a value of -1 indicates + * unknown, e.g., due to missing stats. + */ + private DistributionMode computeJoinDistributionMode(JoinNode node, + long broadcastCost, long partitionCost, long rhsDataSize) { + // Check join types that require a specific distribution strategy to run correctly. + JoinOperator op = node.getJoinOp(); + if (op == JoinOperator.RIGHT_OUTER_JOIN || op == JoinOperator.RIGHT_SEMI_JOIN + || op == JoinOperator.RIGHT_ANTI_JOIN || op == JoinOperator.FULL_OUTER_JOIN) { + return DistributionMode.PARTITIONED; + } + if (op == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) return DistributionMode.BROADCAST; + + // Check join hints. + if (node.getDistributionModeHint() != DistributionMode.NONE) { + return node.getDistributionModeHint(); + } + + // Use the default mode when the costs are unknown or tied. + if (broadcastCost == -1 || partitionCost == -1 || broadcastCost == partitionCost) { + return DistributionMode.fromThrift( + ctx_.getQueryOptions().getDefault_join_distribution_mode()); + } + + // Decide the distribution mode based on the estimated costs and the mem limit. + long htSize = Math.round(rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD); + long memLimit = ctx_.getQueryOptions().mem_limit; + if (broadcastCost <= partitionCost && (memLimit == 0 || htSize <= memLimit)) { + return DistributionMode.BROADCAST; + } + // Partitioned was cheaper or the broadcast HT would not fit within the mem limit. + return DistributionMode.PARTITIONED; + } + /** * Returns true if the lhs and rhs partitions are physically compatible for executing * a partitioned join with the given lhs/rhs join exprs. Physical compatibility means http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/fe/src/main/java/org/apache/impala/planner/JoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java index b40ef55..3bd0899 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java @@ -32,6 +32,8 @@ import org.apache.impala.analysis.SlotRef; import org.apache.impala.catalog.ColumnStats; import org.apache.impala.catalog.Table; import org.apache.impala.common.ImpalaException; +import org.apache.impala.thrift.TJoinDistributionMode; + import com.google.common.base.Preconditions; /** @@ -83,6 +85,10 @@ public abstract class JoinNode extends PlanNode { @Override public String toString() { return description_; } + public static DistributionMode fromThrift(TJoinDistributionMode distrMode) { + if (distrMode == TJoinDistributionMode.BROADCAST) return BROADCAST; + return PARTITIONED; + } } public JoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 07b49a2..270df42 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -25,6 +25,7 @@ import org.apache.impala.common.RuntimeEnv; import org.apache.impala.testutil.TestUtils; import org.apache.impala.thrift.TExecRequest; import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TJoinDistributionMode; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TRuntimeFilterMode; @@ -413,4 +414,14 @@ public class PlannerTest extends PlannerTestBase { options.setExplain_level(TExplainLevel.EXTENDED); runPlannerTestFile("tablesample", options); } + + @Test + public void testDefaultJoinDistributionMode() { + TQueryOptions options = defaultQueryOptions(); + Preconditions.checkState( + options.getDefault_join_distribution_mode() == TJoinDistributionMode.BROADCAST); + runPlannerTestFile("default-join-distr-mode-broadcast", options); + options.setDefault_join_distribution_mode(TJoinDistributionMode.SHUFFLE); + runPlannerTestFile("default-join-distr-mode-shuffle", options); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-broadcast.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-broadcast.test b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-broadcast.test new file mode 100644 index 0000000..8735f97 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-broadcast.test @@ -0,0 +1,63 @@ +# Both join inputs have an unknown cardinality. +select /* +straight_join */ * from +functional.tinytable x inner join functional.tinytable y on x.a = y.a +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +04:EXCHANGE [UNPARTITIONED] +| +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: x.a = y.a +| runtime filters: RF000 <- y.a +| +|--03:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.tinytable y] +| partitions=1/1 files=1 size=38B +| +00:SCAN HDFS [functional.tinytable x] + partitions=1/1 files=1 size=38B + runtime filters: RF000 -> x.a +==== +# Left join input has an unknown cardinality. +select /* +straight_join */ * from +functional.tinytable x inner join functional.alltypes y on x.a = y.string_col +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +04:EXCHANGE [UNPARTITIONED] +| +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: x.a = y.string_col +| runtime filters: RF000 <- y.string_col +| +|--03:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypes y] +| partitions=24/24 files=24 size=469.90KB +| +00:SCAN HDFS [functional.tinytable x] + partitions=1/1 files=1 size=38B + runtime filters: RF000 -> x.a +==== +# Right join input has an unknown cardinality. +select /* +straight_join */ * from +functional.alltypes x inner join functional.tinytable y on x.string_col = y.a +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +04:EXCHANGE [UNPARTITIONED] +| +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: x.string_col = y.a +| runtime filters: RF000 <- y.a +| +|--03:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.tinytable y] +| partitions=1/1 files=1 size=38B +| +00:SCAN HDFS [functional.alltypes x] + partitions=24/24 files=24 size=469.90KB + runtime filters: RF000 -> x.string_col +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle.test b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle.test new file mode 100644 index 0000000..59e60c9 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle.test @@ -0,0 +1,69 @@ +# Both join inputs have an unknown cardinality. +select /* +straight_join */ * from +functional.tinytable x inner join functional.tinytable y on x.a = y.a +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +05:EXCHANGE [UNPARTITIONED] +| +02:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: x.a = y.a +| runtime filters: RF000 <- y.a +| +|--04:EXCHANGE [HASH(y.a)] +| | +| 01:SCAN HDFS [functional.tinytable y] +| partitions=1/1 files=1 size=38B +| +03:EXCHANGE [HASH(x.a)] +| +00:SCAN HDFS [functional.tinytable x] + partitions=1/1 files=1 size=38B + runtime filters: RF000 -> x.a +==== +# Left join input has an unknown cardinality. +select /* +straight_join */ * from +functional.tinytable x inner join functional.alltypes y on x.a = y.string_col +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +05:EXCHANGE [UNPARTITIONED] +| +02:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: x.a = y.string_col +| runtime filters: RF000 <- y.string_col +| +|--04:EXCHANGE [HASH(y.string_col)] +| | +| 01:SCAN HDFS [functional.alltypes y] +| partitions=24/24 files=24 size=469.90KB +| +03:EXCHANGE [HASH(x.a)] +| +00:SCAN HDFS [functional.tinytable x] + partitions=1/1 files=1 size=38B + runtime filters: RF000 -> x.a +==== +# Right join input has an unknown cardinality. +select /* +straight_join */ * from +functional.alltypes x inner join functional.tinytable y on x.string_col = y.a +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +05:EXCHANGE [UNPARTITIONED] +| +02:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: x.string_col = y.a +| runtime filters: RF000 <- y.a +| +|--04:EXCHANGE [HASH(y.a)] +| | +| 01:SCAN HDFS [functional.tinytable y] +| partitions=1/1 files=1 size=38B +| +03:EXCHANGE [HASH(x.string_col)] +| +00:SCAN HDFS [functional.alltypes x] + partitions=24/24 files=24 size=469.90KB + runtime filters: RF000 -> x.string_col +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/testdata/workloads/functional-planner/queries/PlannerTest/joins.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test index 26b8c64..0fdb19d 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test @@ -2519,25 +2519,3 @@ PLAN-ROOT SINK 00:SCAN HDFS [tpch.customer a] partitions=1/1 files=1 size=23.08MB ==== -# If stats aren't available, default to partitioned join. -select * from functional.tinytable x, functional.tinytable y where x.a = y.a ----- DISTRIBUTEDPLAN -PLAN-ROOT SINK -| -05:EXCHANGE [UNPARTITIONED] -| -02:HASH JOIN [INNER JOIN, PARTITIONED] -| hash predicates: x.a = y.a -| runtime filters: RF000 <- y.a -| -|--04:EXCHANGE [HASH(y.a)] -| | -| 01:SCAN HDFS [functional.tinytable y] -| partitions=1/1 files=1 size=38B -| -03:EXCHANGE [HASH(x.a)] -| -00:SCAN HDFS [functional.tinytable x] - partitions=1/1 files=1 size=38B - runtime filters: RF000 -> x.a -====
