Repository: drill
Updated Branches:
  refs/heads/master 9c0738d94 -> 3442215fd


DRILL-2236: Optimize hash inner join by swapping inputs based on row count 
comparison. Add a planner option to enable/disable this feature.

Revise code based on review comments.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3442215f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3442215f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3442215f

Branch: refs/heads/master
Commit: 3442215fd91e700f659bc055cd7c05b623bc59b3
Parents: 9c0738d
Author: Jinfeng Ni <j...@maprtech.com>
Authored: Thu Jan 29 13:24:28 2015 -0800
Committer: Jinfeng Ni <j...@maprtech.com>
Committed: Mon Mar 2 10:03:31 2015 -0800

----------------------------------------------------------------------
 .../exec/planner/physical/HashJoinPrel.java     | 54 +++++++++----
 .../drill/exec/planner/physical/JoinPrel.java   |  4 +-
 .../exec/planner/physical/MergeJoinPrel.java    |  2 +-
 .../exec/planner/physical/PlannerSettings.java  | 11 +++
 .../physical/explain/NumberingRelWriter.java    |  7 ++
 .../physical/visitor/SwapHashJoinVisitor.java   | 79 ++++++++++++++++++++
 .../planner/sql/handlers/DefaultSqlHandler.java | 13 +++-
 .../server/options/SystemOptionManager.java     |  2 +
 8 files changed, 154 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index a3c42de..f63057f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical;
 import java.io.IOException;
 import java.util.List;
 
+import net.hydromatic.optiq.runtime.FlatLists;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.exec.ExecConstants;
@@ -46,18 +47,24 @@ import com.google.common.collect.Lists;
 
 public class HashJoinPrel  extends JoinPrel {
 
+  private boolean swapped = false;
+
   public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
-      JoinRelType joinType) throws InvalidRelException {
-    super(cluster, traits, left, right, condition, joinType);
+                      JoinRelType joinType) throws InvalidRelException {
+    this(cluster, traits, left, right, condition, joinType, false);
+  }
 
+  public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
+      JoinRelType joinType, boolean swapped) throws InvalidRelException {
+    super(cluster, traits, left, right, condition, joinType);
+    this.swapped = swapped;
     RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
   }
 
-
   @Override
   public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode 
left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
     try {
-      return new HashJoinPrel(this.getCluster(), traitSet, left, right, 
conditionExpr, joinType);
+      return new HashJoinPrel(this.getCluster(), traitSet, left, right, 
conditionExpr, joinType, this.swapped);
     }catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
@@ -100,11 +107,32 @@ public class HashJoinPrel  extends JoinPrel {
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+    // Depending on whether the left/right is swapped for hash inner join, 
pass in different
+    // combinations of parameters.
+    if (! swapped) {
+      return getHashJoinPop(creator, left, right, leftKeys, rightKeys);
+    } else {
+      return getHashJoinPop(creator, right, left, rightKeys, leftKeys);
+    }
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  private PhysicalOperator getHashJoinPop(PhysicalPlanCreator creator, RelNode 
left, RelNode right,
+                                          List<Integer> leftKeys, 
List<Integer> rightKeys) throws IOException{
     final List<String> fields = getRowType().getFieldNames();
     assert isUnique(fields);
-    final int leftCount = left.getRowType().getFieldCount();
-    final List<String> leftFields = fields.subList(0, leftCount);
-    final List<String> rightFields = fields.subList(leftCount, fields.size());
+
+    final List<String> leftFields = left.getRowType().getFieldNames();
+    final List<String> rightFields = right.getRowType().getFieldNames();
 
     PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator);
     PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
@@ -113,20 +141,18 @@ public class HashJoinPrel  extends JoinPrel {
 
     List<JoinCondition> conditions = Lists.newArrayList();
 
-    buildJoinConditions(conditions, leftFields, rightFields);
+    buildJoinConditions(conditions, leftFields, rightFields, leftKeys, 
rightKeys);
 
     HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype);
     return creator.addMetadata(this, hjoin);
   }
 
-  @Override
-  public SelectionVectorMode[] getSupportedEncodings() {
-    return SelectionVectorMode.DEFAULT;
+  public void setSwapped(boolean swapped) {
+    this.swapped = swapped;
   }
 
-  @Override
-  public SelectionVectorMode getEncoding() {
-    return SelectionVectorMode.NONE;
+  public boolean isSwapped() {
+    return this.swapped;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index 3541db7..bfecd06 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -118,7 +118,9 @@ public abstract class JoinPrel extends DrillJoinRelBase 
implements Prel{
    */
   protected void buildJoinConditions(List<JoinCondition> conditions,
       List<String> leftFields,
-      List<String> rightFields) {
+      List<String> rightFields,
+      List<Integer> leftKeys,
+      List<Integer> rightKeys) {
     List<RexNode> conjuncts = RelOptUtil.conjunctions(this.getCondition());
     short i=0;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index 394a82c..b7e86e3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -99,7 +99,7 @@ public class MergeJoinPrel  extends JoinPrel {
 
     List<JoinCondition> conditions = Lists.newArrayList();
 
-    buildJoinConditions(conditions, leftFields, rightFields);
+    buildJoinConditions(conditions, leftFields, rightFields, leftKeys, 
rightKeys);
 
     MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, 
jtype);
     return creator.addMetadata(this, mjoin);

http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 96be07d..bbfbbcb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -49,6 +49,9 @@ public class PlannerSettings implements Context{
   public static final OptionValidator PRODUCER_CONSUMER = new 
BooleanValidator("planner.add_producer_consumer", false);
   public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new 
LongValidator("planner.producer_consumer_queue_size", 10);
   public static final OptionValidator HASH_SINGLE_KEY = new 
BooleanValidator("planner.enable_hash_single_key", true);
+  public static final OptionValidator HASH_JOIN_SWAP = new 
BooleanValidator("planner.enable_hashjoin_swap", true);
+  public static final OptionValidator HASH_JOIN_SWAP_MARGIN_FACTOR = new 
RangeDoubleValidator("planner.join.hash_join_swap_margin_factor", 0, 100, 10d);
+
   public static final OptionValidator IDENTIFIER_MAX_LENGTH =
       new RangeLongValidator("planner.identifier_max_length", 128 /* A minimum 
length is needed because option names are identifiers themselves */,
                               Integer.MAX_VALUE, 
DEFAULT_IDENTIFIER_MAX_LENGTH);
@@ -117,6 +120,14 @@ public class PlannerSettings implements Context{
     return options.getOption(HASH_SINGLE_KEY.getOptionName()).bool_val;
   }
 
+  public boolean isHashJoinSwapEnabled() {
+    return options.getOption(HASH_JOIN_SWAP.getOptionName()).bool_val;
+  }
+
+  public double getHashJoinSwapMarginFactor() {
+    return 
options.getOption(HASH_JOIN_SWAP_MARGIN_FACTOR.getOptionName()).float_val / 
100d;
+  }
+
   public long getBroadcastThreshold() {
     return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
index 6522ad9..387a442 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java
@@ -23,8 +23,10 @@ import java.util.List;
 import java.util.Map;
 
 import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.runtime.FlatLists;
 import net.hydromatic.optiq.runtime.Spacer;
 
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId;
 import org.eigenbase.rel.RelNode;
@@ -62,6 +64,10 @@ class NumberingRelWriter implements RelWriter {
       RelNode rel,
       List<Pair<String, Object>> values) {
     List<RelNode> inputs = rel.getInputs();
+    if (rel instanceof HashJoinPrel && ((HashJoinPrel) rel).isSwapped()) {
+      HashJoinPrel joinPrel = (HashJoinPrel) rel;
+      inputs = FlatLists.of(joinPrel.getRight(), joinPrel.getLeft());
+    }
 
     if (!RelMetadataQuery.isVisibleInExplain(
         rel,
@@ -106,6 +112,7 @@ class NumberingRelWriter implements RelWriter {
       }
     }
     if (detailLevel == SqlExplainLevel.ALL_ATTRIBUTES) {
+      s.append(" : rowType = " + rel.getRowType().toString());
       s.append(": rowcount = ")
           .append(RelMetadataQuery.getRowCount(rel))
           .append(", cumulative cost = ")

http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
new file mode 100644
index 0000000..18d5e60
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical.visitor;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.JoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * Visit Prel tree. Find all the HashJoinPrel nodes and set the flag to swap 
the Left/Right for HashJoinPrel
+ * when 1) It's inner join, 2) left rowcount is < (1 + percentage) * 
right_row_count.
+ * The purpose of this visitor is to prevent planner from putting bigger 
dataset in the RIGHT side,
+ * which is not good performance-wise.
+ *
+ * @see org.apache.drill.exec.planner.physical.HashJoinPrel
+ */
+
+public class SwapHashJoinVisitor extends BasePrelVisitor<Prel, Double, 
RuntimeException>{
+
+  private static SwapHashJoinVisitor INSTANCE = new SwapHashJoinVisitor();
+
+  public static Prel swapHashJoin(Prel prel, Double marginFactor){
+    return prel.accept(INSTANCE, marginFactor);
+  }
+
+  private SwapHashJoinVisitor() {
+
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Double value) throws RuntimeException {
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      child = child.accept(this, value);
+      children.add(child);
+    }
+
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+  @Override
+  public Prel visitJoin(JoinPrel prel, Double value) throws RuntimeException {
+    JoinPrel newJoin = (JoinPrel) visitPrel(prel, value);
+
+    if (prel instanceof HashJoinPrel) {
+      // Mark left/right is swapped, when INNER hash join's left row count < ( 
1+ margin factor) right row count.
+      if (newJoin.getLeft().getRows() < (1 + value.doubleValue() ) * 
newJoin.getRight().getRows() &&
+          newJoin.getJoinType() == JoinRelType.INNER) {
+        ( (HashJoinPrel) newJoin).setSwapped(true);
+      }
+    }
+
+    return newJoin;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 35e7f5c..232778a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -56,6 +56,7 @@ import 
org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten;
 import 
org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor;
 import 
org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions;
 import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter;
+import org.apache.drill.exec.planner.physical.visitor.SwapHashJoinVisitor;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.planner.sql.parser.UnsupportedOperatorsVisitor;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -246,12 +247,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler 
{
     phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
 
     /*
-     * 1.1) Break up all expressions with complex outputs into their own 
project operations
+     * 1.1) Swap left / right for INNER hash join, if left's row count is < (1 
+ margin) right's row count.
+     * We want to have smaller dataset on the right side, since hash table 
builds on right side.
+     */
+    if (context.getPlannerSettings().isHashJoinSwapEnabled()) {
+      phyRelNode = SwapHashJoinVisitor.swapHashJoin(phyRelNode, new 
Double(context.getPlannerSettings().getHashJoinSwapMarginFactor()));
+    }
+
+    /*
+     * 1.2) Break up all expressions with complex outputs into their own 
project operations
      */
     phyRelNode = ((Prel) phyRelNode).accept(new 
SplitUpComplexExpressions(planner.getTypeFactory(), 
context.getDrillOperatorTable(), 
context.getPlannerSettings().functionImplementationRegistry), null);
 
     /*
-     * 1.2) Projections that contain reference to flatten are rewritten as 
Flatten operators followed by Project
+     * 1.3) Projections that contain reference to flatten are rewritten as 
Flatten operators followed by Project
      */
     phyRelNode = ((Prel) phyRelNode).accept(new 
RewriteProjectToFlatten(planner.getTypeFactory(), 
context.getDrillOperatorTable()), null);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index aa0a5ad..3d3e96f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -54,6 +54,8 @@ public class SystemOptionManager implements OptionManager {
       PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE,
       PlannerSettings.HASH_SINGLE_KEY,
       PlannerSettings.IDENTIFIER_MAX_LENGTH,
+      PlannerSettings.HASH_JOIN_SWAP,
+      PlannerSettings.HASH_JOIN_SWAP_MARGIN_FACTOR,
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,

Reply via email to