IMPALA-5381: Adds DEFAULT_JOIN_DISTRIBUTION_MODE query option.

Adds a new query option DEFAULT_JOIN_DISTRIBUTION_MODE to
control which join distribution mode is chosen when the join
inputs have an unknown cardinality (e.g., missing stats) or when
the expected costs of the different strategies are equal.

Values for DEFAULT_JOIN_DISTRIBUTION_MODE: [BROADCAST, SHUFFLE]
Default: BROADCAST

Note that this change effectively undoes IMPALA-5120.

Testing:
- Added new planner tests
- Core/hdfs run passed

Change-Id: Ibd34442f422129d53bef5493fc9cbe7375a0765c
Reviewed-on: http://gerrit.cloudera.org:8080/7059
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ecda49f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ecda49f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ecda49f3

Branch: refs/heads/master
Commit: ecda49f3e3001e23bebd6bdfaa1c612716df4bf1
Parents: 5518cbc
Author: Alex Behm <[email protected]>
Authored: Thu Jun 1 18:39:43 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Sun Jun 4 08:11:53 2017 +0000

----------------------------------------------------------------------
 be/src/service/query-options.cc                 |  13 ++
 be/src/service/query-options.h                  |   3 +-
 common/thrift/ImpalaInternalService.thrift      |  10 ++
 common/thrift/ImpalaService.thrift              |   4 +
 .../impala/planner/DistributedPlanner.java      | 129 +++++++++----------
 .../org/apache/impala/planner/JoinNode.java     |   6 +
 .../org/apache/impala/planner/PlannerTest.java  |  11 ++
 .../default-join-distr-mode-broadcast.test      |  63 +++++++++
 .../default-join-distr-mode-shuffle.test        |  69 ++++++++++
 .../queries/PlannerTest/joins.test              |  22 ----
 10 files changed, 242 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 0f0d30c..f4c4f05 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -480,6 +480,19 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
             iequals(value, "true") || iequals(value, "1"));
         break;
       }
+      case TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE: {
+        if (iequals(value, "BROADCAST") || iequals(value, "0")) {
+          query_options->__set_default_join_distribution_mode(
+              TJoinDistributionMode::BROADCAST);
+        } else if (iequals(value, "SHUFFLE") || iequals(value, "1")) {
+          query_options->__set_default_join_distribution_mode(
+              TJoinDistributionMode::SHUFFLE);
+        } else {
+          return Status(Substitute("Invalid default_join_distribution_mode 
'$0'. "
+              "Valid values are BROADCAST or SHUFFLE", value));
+        }
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding 
entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 3a11383..1f5624c 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -35,7 +35,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::PARQUET_READ_STATISTICS + 1);\
+      TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -91,6 +91,7 @@ class TQueryOptions;
   QUERY_OPT_FN(parquet_dictionary_filtering, PARQUET_DICTIONARY_FILTERING)\
   QUERY_OPT_FN(parquet_array_resolution, PARQUET_ARRAY_RESOLUTION)\
   QUERY_OPT_FN(parquet_read_statistics, PARQUET_READ_STATISTICS)\
+  QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE)\
   ;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 17bfc4a..f622ed4 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -57,6 +57,11 @@ enum TParquetArrayResolution {
   TWO_LEVEL_THEN_THREE_LEVEL
 }
 
+enum TJoinDistributionMode {
+  BROADCAST,
+  SHUFFLE
+}
+
 // Query options that correspond to ImpalaService.ImpalaQueryOptions, with 
their
 // respective defaults. Query options can be set in the following ways:
 //
@@ -241,6 +246,11 @@ struct TQueryOptions {
   // processing. This includes skipping data based on the statistics and 
computing query
   // results like "select min()".
   55: optional bool parquet_read_statistics = true
+
+  // Join distribution mode that is used when the join inputs have an unknown
+  // cardinality, e.g., because of missing table statistics.
+  56: optional TJoinDistributionMode default_join_distribution_mode =
+    TJoinDistributionMode.BROADCAST
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index dd89e52..fb0016c 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -271,6 +271,10 @@ enum TImpalaQueryOptions {
   // processing. This includes skipping data based on the statistics and 
computing query
   // results like "select min()".
   PARQUET_READ_STATISTICS,
+
+  // Join distribution mode that is used when the join inputs have an unknown
+  // cardinality, e.g., because of missing table statistics.
+  DEFAULT_JOIN_DISTRIBUTION_MODE
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 21423c5..2266625 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -79,12 +79,7 @@ public class DistributedPlanner {
       Preconditions.checkState(!queryStmt.hasOffset());
       isPartitioned = true;
     }
-    long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("create plan fragments");
-      LOG.trace("memlimit=" + Long.toString(perNodeMemLimit));
-    }
-    createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, 
fragments);
+    createPlanFragments(singleNodePlan, isPartitioned, fragments);
     return fragments;
   }
 
@@ -98,8 +93,7 @@ public class DistributedPlanner {
    * partitioned; the partition function is derived from the inputs.
    */
   private PlanFragment createPlanFragments(
-      PlanNode root, boolean isPartitioned,
-      long perNodeMemLimit, ArrayList<PlanFragment> fragments)
+      PlanNode root, boolean isPartitioned, ArrayList<PlanFragment> fragments)
       throws ImpalaException {
     ArrayList<PlanFragment> childFragments = Lists.newArrayList();
     for (PlanNode child: root.getChildren()) {
@@ -109,9 +103,7 @@ public class DistributedPlanner {
       boolean childIsPartitioned = !child.hasLimit();
       // Do not fragment the subplan of a SubplanNode since it is executed 
locally.
       if (root instanceof SubplanNode && child == root.getChild(1)) continue;
-      childFragments.add(
-          createPlanFragments(
-            child, childIsPartitioned, perNodeMemLimit, fragments));
+      childFragments.add(createPlanFragments(child, childIsPartitioned, 
fragments));
     }
 
     PlanFragment result = null;
@@ -120,14 +112,12 @@ public class DistributedPlanner {
       fragments.add(result);
     } else if (root instanceof HashJoinNode) {
       Preconditions.checkState(childFragments.size() == 2);
-      result = createHashJoinFragment(
-          (HashJoinNode) root, childFragments.get(1), childFragments.get(0),
-          perNodeMemLimit, fragments);
+      result = createHashJoinFragment((HashJoinNode) root,
+          childFragments.get(1), childFragments.get(0), fragments);
     } else if (root instanceof NestedLoopJoinNode) {
       Preconditions.checkState(childFragments.size() == 2);
-      result = createNestedLoopJoinFragment(
-          (NestedLoopJoinNode) root, childFragments.get(1), 
childFragments.get(0),
-          perNodeMemLimit, fragments);
+      result = createNestedLoopJoinFragment((NestedLoopJoinNode) root,
+          childFragments.get(1), childFragments.get(0), fragments);
     } else if (root instanceof SubplanNode) {
       Preconditions.checkState(childFragments.size() == 1);
       result = createSubplanNodeFragment((SubplanNode) root, 
childFragments.get(0));
@@ -304,8 +294,7 @@ public class DistributedPlanner {
    */
   private PlanFragment createNestedLoopJoinFragment(NestedLoopJoinNode node,
       PlanFragment rightChildFragment, PlanFragment leftChildFragment,
-      long perNodeMemLimit, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
+      ArrayList<PlanFragment> fragments) throws ImpalaException {
     node.setDistributionMode(DistributionMode.BROADCAST);
     node.setChild(0, leftChildFragment.getPlanRoot());
     connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
@@ -321,7 +310,7 @@ public class DistributedPlanner {
       PlanFragment leftChildFragment, PlanFragment rightChildFragment,
       List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs,
       ArrayList<PlanFragment> fragments) throws ImpalaException {
-    node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
+    Preconditions.checkState(node.getDistributionMode() == 
DistributionMode.PARTITIONED);
     // The lhs and rhs input fragments are already partitioned on the join 
exprs.
     // Combine the lhs/rhs input fragments into leftChildFragment by placing 
the join
     // node into leftChildFragment and setting its lhs/rhs children to the 
plan root of
@@ -415,20 +404,14 @@ public class DistributedPlanner {
   }
 
   /**
-   * Creates either a broadcast join or a repartitioning join, depending on the
-   * expected cost.
-   * If any of the inputs to the cost computation is unknown, it assumes the 
cost
-   * will be 0. Costs being equal, it'll favor partitioned over broadcast 
joins.
-   * If perNodeMemLimit > 0 and the size of the hash table for a broadcast 
join is
-   * expected to exceed that mem limit, switches to partitioned join instead.
-   * TODO: revisit the choice of broadcast as the default
+   * Creates either a broadcast join or a repartitioning join depending on the 
expected
+   * cost and various constraints. See computeDistributionMode() for more 
details.
    * TODO: don't create a broadcast join if we already anticipate that this 
will
    * exceed the query's memory budget.
    */
   private PlanFragment createHashJoinFragment(
       HashJoinNode node, PlanFragment rightChildFragment,
-      PlanFragment leftChildFragment, long perNodeMemLimit,
-      ArrayList<PlanFragment> fragments)
+      PlanFragment leftChildFragment, ArrayList<PlanFragment> fragments)
       throws ImpalaException {
     // For both join types, the total cost is calculated as the amount of data
     // sent over the network, plus the amount of data inserted into the hash 
table.
@@ -436,8 +419,8 @@ public class DistributedPlanner {
     // the leftChildFragment, and build a hash table with it on each node.
     Analyzer analyzer = ctx_.getRootAnalyzer();
     PlanNode rhsTree = rightChildFragment.getPlanRoot();
-    long rhsDataSize = 0;
-    long broadcastCost = Long.MAX_VALUE;
+    long rhsDataSize = -1;
+    long broadcastCost = -1;
     if (rhsTree.getCardinality() != -1) {
       rhsDataSize = Math.round(
           rhsTree.getCardinality() * 
ExchangeNode.getAvgSerializedRowSize(rhsTree));
@@ -455,8 +438,6 @@ public class DistributedPlanner {
     // repartition: both left- and rightChildFragment are partitioned on the
     // join exprs, and a hash table is built with the rightChildFragment's 
output.
     PlanNode lhsTree = leftChildFragment.getPlanRoot();
-    // Subtract 1 here so that if stats are missing we default to partitioned.
-    long partitionCost = Long.MAX_VALUE - 1;
     List<Expr> lhsJoinExprs = Lists.newArrayList();
     List<Expr> rhsJoinExprs = Lists.newArrayList();
     for (Expr joinConjunct: node.getEqJoinConjuncts()) {
@@ -466,12 +447,14 @@ public class DistributedPlanner {
     }
     boolean lhsHasCompatPartition = false;
     boolean rhsHasCompatPartition = false;
+    long partitionCost = -1;
     if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) {
       lhsHasCompatPartition = analyzer.equivSets(lhsJoinExprs,
           leftChildFragment.getDataPartition().getPartitionExprs());
       rhsHasCompatPartition = analyzer.equivSets(rhsJoinExprs,
           rightChildFragment.getDataPartition().getPartitionExprs());
 
+      Preconditions.checkState(rhsDataSize != -1);
       double lhsNetworkCost = (lhsHasCompatPartition) ? 0.0 :
         Math.round(
             lhsTree.getCardinality() * 
ExchangeNode.getAvgSerializedRowSize(lhsTree));
@@ -487,39 +470,12 @@ public class DistributedPlanner {
       LOG.trace(rhsTree.getExplainString(ctx_.getQueryOptions()));
     }
 
-    boolean doBroadcast = false;
-    // we do a broadcast join if
-    // - we're explicitly told to do so
-    // - or if it's cheaper and we weren't explicitly told to do a partitioned 
join
-    // - and we're not doing a full outer or right outer/semi join (those 
require the
-    //   left-hand side to be partitioned for correctness)
-    // - and the expected size of the hash tbl doesn't exceed perNodeMemLimit
-    // - or we are doing a null-aware left anti join (broadcast is required for
-    //   correctness)
-    // we do a "<=" comparison of the costs so that we default to broadcast 
joins if
-    // we're unable to estimate the cost
-    if ((node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
-        && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN
-        && node.getJoinOp() != JoinOperator.RIGHT_SEMI_JOIN
-        && node.getJoinOp() != JoinOperator.RIGHT_ANTI_JOIN
-        // a broadcast join hint overides the check to see if the hash table
-        // size is less than the pernode memlimit
-        && (node.getDistributionModeHint() == DistributionMode.BROADCAST
-            || perNodeMemLimit == 0
-            || Math.round(rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD)
-                <= perNodeMemLimit)
-        // a broadcast join hint overrides the check to see if performing a 
broadcast
-        // join is more costly than a partitioned join
-        && (node.getDistributionModeHint() == DistributionMode.BROADCAST
-            || (node.getDistributionModeHint() != DistributionMode.PARTITIONED
-                && broadcastCost <= partitionCost)))
-        || node.getJoinOp().isNullAwareLeftAntiJoin()) {
-      doBroadcast = true;
-    }
+    DistributionMode distrMode = computeJoinDistributionMode(
+        node, broadcastCost, partitionCost, rhsDataSize);
+    node.setDistributionMode(distrMode);
 
     PlanFragment hjFragment = null;
-    if (doBroadcast) {
-      node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST);
+    if (distrMode == DistributionMode.BROADCAST) {
       // Doesn't create a new fragment, but modifies leftChildFragment to 
execute
       // the join; the build input is provided by an ExchangeNode, which is the
       // destination of the rightChildFragment's output
@@ -534,7 +490,7 @@ public class DistributedPlanner {
     }
 
     for (RuntimeFilter filter: node.getRuntimeFilters()) {
-      filter.setIsBroadcast(doBroadcast);
+      filter.setIsBroadcast(distrMode == DistributionMode.BROADCAST);
       filter.computeHasLocalTargets();
       // Work around IMPALA-3450, where cardinalities might be wrong in 
single-node plans
       // with UNION and LIMITs.
@@ -544,6 +500,49 @@ public class DistributedPlanner {
     return hjFragment;
  }
 
+ /**
+  * Determines and returns the distribution mode for the given join based on 
the expected
+  * costs and the right-hand size data size. Considers the following:
+  * - Some join types require a specific distribution strategy to run 
correctly.
+  * - Checks for join hints.
+  * - Uses the default join strategy (query option) when the costs are unknown 
or tied.
+  * - Returns broadcast if it is cheaper than partitioned and the expected 
hash table
+  *   size is within the query mem limit.
+  * - Otherwise, returns partitioned.
+  * For 'broadcastCost', 'partitionCost', and 'rhsDataSize' a value of -1 
indicates
+  * unknown, e.g., due to missing stats.
+  */
+ private DistributionMode computeJoinDistributionMode(JoinNode node,
+     long broadcastCost, long partitionCost, long rhsDataSize) {
+   // Check join types that require a specific distribution strategy to run 
correctly.
+   JoinOperator op = node.getJoinOp();
+   if (op == JoinOperator.RIGHT_OUTER_JOIN || op == 
JoinOperator.RIGHT_SEMI_JOIN
+       || op == JoinOperator.RIGHT_ANTI_JOIN || op == 
JoinOperator.FULL_OUTER_JOIN) {
+     return DistributionMode.PARTITIONED;
+   }
+   if (op == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) return 
DistributionMode.BROADCAST;
+
+   // Check join hints.
+   if (node.getDistributionModeHint() != DistributionMode.NONE) {
+     return node.getDistributionModeHint();
+   }
+
+   // Use the default mode when the costs are unknown or tied.
+   if (broadcastCost == -1 || partitionCost == -1 || broadcastCost == 
partitionCost) {
+     return DistributionMode.fromThrift(
+         ctx_.getQueryOptions().getDefault_join_distribution_mode());
+   }
+
+   // Decide the distribution mode based on the estimated costs and the mem 
limit.
+   long htSize = Math.round(rhsDataSize * 
PlannerContext.HASH_TBL_SPACE_OVERHEAD);
+   long memLimit = ctx_.getQueryOptions().mem_limit;
+   if (broadcastCost <= partitionCost && (memLimit == 0 || htSize <= 
memLimit)) {
+     return DistributionMode.BROADCAST;
+   }
+   // Partitioned was cheaper or the broadcast HT would not fit within the mem 
limit.
+   return DistributionMode.PARTITIONED;
+ }
+
   /**
    * Returns true if the lhs and rhs partitions are physically compatible for 
executing
    * a partitioned join with the given lhs/rhs join exprs. Physical 
compatibility means

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/fe/src/main/java/org/apache/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index b40ef55..3bd0899 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -32,6 +32,8 @@ import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.thrift.TJoinDistributionMode;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -83,6 +85,10 @@ public abstract class JoinNode extends PlanNode {
 
     @Override
     public String toString() { return description_; }
+    public static DistributionMode fromThrift(TJoinDistributionMode distrMode) 
{
+      if (distrMode == TJoinDistributionMode.BROADCAST) return BROADCAST;
+      return PARTITIONED;
+    }
   }
 
   public JoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 07b49a2..270df42 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -25,6 +25,7 @@ import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TJoinDistributionMode;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
@@ -413,4 +414,14 @@ public class PlannerTest extends PlannerTestBase {
     options.setExplain_level(TExplainLevel.EXTENDED);
     runPlannerTestFile("tablesample", options);
   }
+
+  @Test
+  public void testDefaultJoinDistributionMode() {
+    TQueryOptions options = defaultQueryOptions();
+    Preconditions.checkState(
+        options.getDefault_join_distribution_mode() == 
TJoinDistributionMode.BROADCAST);
+    runPlannerTestFile("default-join-distr-mode-broadcast", options);
+    options.setDefault_join_distribution_mode(TJoinDistributionMode.SHUFFLE);
+    runPlannerTestFile("default-join-distr-mode-shuffle", options);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-broadcast.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-broadcast.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-broadcast.test
new file mode 100644
index 0000000..8735f97
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-broadcast.test
@@ -0,0 +1,63 @@
+# Both join inputs have an unknown cardinality.
+select /* +straight_join */ * from
+functional.tinytable x inner join functional.tinytable y on x.a = y.a
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: x.a = y.a
+|  runtime filters: RF000 <- y.a
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.tinytable y]
+|     partitions=1/1 files=1 size=38B
+|
+00:SCAN HDFS [functional.tinytable x]
+   partitions=1/1 files=1 size=38B
+   runtime filters: RF000 -> x.a
+====
+# Left join input has an unknown cardinality.
+select /* +straight_join */ * from
+functional.tinytable x inner join functional.alltypes y on x.a = y.string_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: x.a = y.string_col
+|  runtime filters: RF000 <- y.string_col
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.alltypes y]
+|     partitions=24/24 files=24 size=469.90KB
+|
+00:SCAN HDFS [functional.tinytable x]
+   partitions=1/1 files=1 size=38B
+   runtime filters: RF000 -> x.a
+====
+# Right join input has an unknown cardinality.
+select /* +straight_join */ * from
+functional.alltypes x inner join functional.tinytable y on x.string_col = y.a
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: x.string_col = y.a
+|  runtime filters: RF000 <- y.a
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.tinytable y]
+|     partitions=1/1 files=1 size=38B
+|
+00:SCAN HDFS [functional.alltypes x]
+   partitions=24/24 files=24 size=469.90KB
+   runtime filters: RF000 -> x.string_col
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle.test
new file mode 100644
index 0000000..59e60c9
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/default-join-distr-mode-shuffle.test
@@ -0,0 +1,69 @@
+# Both join inputs have an unknown cardinality.
+select /* +straight_join */ * from
+functional.tinytable x inner join functional.tinytable y on x.a = y.a
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: x.a = y.a
+|  runtime filters: RF000 <- y.a
+|
+|--04:EXCHANGE [HASH(y.a)]
+|  |
+|  01:SCAN HDFS [functional.tinytable y]
+|     partitions=1/1 files=1 size=38B
+|
+03:EXCHANGE [HASH(x.a)]
+|
+00:SCAN HDFS [functional.tinytable x]
+   partitions=1/1 files=1 size=38B
+   runtime filters: RF000 -> x.a
+====
+# Left join input has an unknown cardinality.
+select /* +straight_join */ * from
+functional.tinytable x inner join functional.alltypes y on x.a = y.string_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: x.a = y.string_col
+|  runtime filters: RF000 <- y.string_col
+|
+|--04:EXCHANGE [HASH(y.string_col)]
+|  |
+|  01:SCAN HDFS [functional.alltypes y]
+|     partitions=24/24 files=24 size=469.90KB
+|
+03:EXCHANGE [HASH(x.a)]
+|
+00:SCAN HDFS [functional.tinytable x]
+   partitions=1/1 files=1 size=38B
+   runtime filters: RF000 -> x.a
+====
+# Right join input has an unknown cardinality.
+select /* +straight_join */ * from
+functional.alltypes x inner join functional.tinytable y on x.string_col = y.a
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: x.string_col = y.a
+|  runtime filters: RF000 <- y.a
+|
+|--04:EXCHANGE [HASH(y.a)]
+|  |
+|  01:SCAN HDFS [functional.tinytable y]
+|     partitions=1/1 files=1 size=38B
+|
+03:EXCHANGE [HASH(x.string_col)]
+|
+00:SCAN HDFS [functional.alltypes x]
+   partitions=24/24 files=24 size=469.90KB
+   runtime filters: RF000 -> x.string_col
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecda49f3/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index 26b8c64..0fdb19d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -2519,25 +2519,3 @@ PLAN-ROOT SINK
 00:SCAN HDFS [tpch.customer a]
    partitions=1/1 files=1 size=23.08MB
 ====
-# If stats aren't available, default to partitioned join.
-select * from functional.tinytable x, functional.tinytable y where x.a = y.a
----- DISTRIBUTEDPLAN
-PLAN-ROOT SINK
-|
-05:EXCHANGE [UNPARTITIONED]
-|
-02:HASH JOIN [INNER JOIN, PARTITIONED]
-|  hash predicates: x.a = y.a
-|  runtime filters: RF000 <- y.a
-|
-|--04:EXCHANGE [HASH(y.a)]
-|  |
-|  01:SCAN HDFS [functional.tinytable y]
-|     partitions=1/1 files=1 size=38B
-|
-03:EXCHANGE [HASH(x.a)]
-|
-00:SCAN HDFS [functional.tinytable x]
-   partitions=1/1 files=1 size=38B
-   runtime filters: RF000 -> x.a
-====

Reply via email to