This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 71809ca  DRILL-6798: Planner changes to support semi-join.
71809ca is described below

commit 71809ca6216d95540b2a41ce1ab2ebb742888671
Author: Hanumath Rao Maduri <[email protected]>
AuthorDate: Tue Oct 9 17:33:43 2018 -0700

    DRILL-6798: Planner changes to support semi-join.
---
 .../apache/drill/exec/planner/PlannerPhase.java    |  20 ++--
 .../apache/drill/exec/planner/RuleInstance.java    |  15 ++-
 .../exec/planner/common/DrillJoinRelBase.java      |   3 +-
 .../drill/exec/planner/logical/DrillJoin.java      |  48 +++++++++
 .../drill/exec/planner/logical/DrillJoinRel.java   |   8 +-
 .../exec/planner/logical/DrillLateralJoinRel.java  |   9 +-
 .../exec/planner/logical/DrillRelFactories.java    |  15 ++-
 .../exec/planner/logical/DrillSemiJoinRel.java     | 101 ++++++++++++++++++
 .../drill/exec/planner/physical/HashJoinPrel.java  |  57 ++++++++--
 .../drill/exec/planner/physical/HashJoinPrule.java |  15 ++-
 .../drill/exec/planner/physical/JoinPrel.java      |  83 ++++++++++++++-
 .../drill/exec/planner/physical/JoinPruleBase.java |  49 +++++----
 .../exec/planner/physical/MergeJoinPrule.java      |   4 +-
 .../exec/planner/physical/NestedLoopJoinPrule.java |   7 +-
 .../exec/planner/physical/PlannerSettings.java     |   6 ++
 .../physical/visitor/SwapHashJoinVisitor.java      |   2 +-
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../exec/physical/impl/join/TestSemiJoin.java      | 118 +++++++++++++++++++++
 .../drill/common/logical/data/LogicalSemiJoin.java |  52 +++++++++
 20 files changed, 553 insertions(+), 61 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index ae55c9f..17f8da5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -358,15 +358,18 @@ public enum PlannerPhase {
      * We have to create another copy of the ruleset with the context 
dependent elements;
      * this cannot be reused across queries.
      */
-    final ImmutableSet<RelOptRule> basicRules = 
ImmutableSet.<RelOptRule>builder()
+    ImmutableSet.Builder<RelOptRule> basicRules = 
ImmutableSet.<RelOptRule>builder()
         .addAll(staticRuleSet)
         .add(
             DrillMergeProjectRule.getInstance(true, 
RelFactories.DEFAULT_PROJECT_FACTORY,
                 optimizerRulesContext.getFunctionRegistry())
-            )
-        .build();
+            );
+    if (optimizerRulesContext.getPlannerSettings().isHashJoinEnabled() &&
+        optimizerRulesContext.getPlannerSettings().isSemiJoinEnabled()) {
+      basicRules.add(RuleInstance.SEMI_JOIN_PROJECT_RULE);
+    }
 
-    return RuleSets.ofList(basicRules);
+    return RuleSets.ofList(basicRules.build());
   }
 
   /**
@@ -474,7 +477,6 @@ public enum PlannerPhase {
   static RuleSet getPhysicalRules(OptimizerRulesContext optimizerRulesContext) 
{
     final List<RelOptRule> ruleList = new ArrayList<>();
     final PlannerSettings ps = optimizerRulesContext.getPlannerSettings();
-
     ruleList.add(ConvertCountToDirectScan.AGG_ON_PROJ_ON_SCAN);
     ruleList.add(ConvertCountToDirectScan.AGG_ON_SCAN);
     ruleList.add(SortConvertPrule.INSTANCE);
@@ -509,9 +511,14 @@ public enum PlannerPhase {
 
     if (ps.isHashJoinEnabled()) {
       ruleList.add(HashJoinPrule.DIST_INSTANCE);
-
+      if (ps.isSemiJoinEnabled()) {
+        ruleList.add(HashJoinPrule.SEMI_DIST_INSTANCE);
+      }
       if(ps.isBroadcastJoinEnabled()){
         ruleList.add(HashJoinPrule.BROADCAST_INSTANCE);
+        if (ps.isSemiJoinEnabled()) {
+          ruleList.add(HashJoinPrule.SEMI_BROADCAST_INSTANCE);
+        }
       }
     }
 
@@ -521,7 +528,6 @@ public enum PlannerPhase {
       if(ps.isBroadcastJoinEnabled()){
         ruleList.add(MergeJoinPrule.BROADCAST_INSTANCE);
       }
-
     }
 
     // NLJ plans consist of broadcasting the right child, hence we need
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 8aec96c..b14488c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -18,8 +18,11 @@
 package org.apache.drill.exec.planner;
 
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.volcano.AbstractConverter;
+import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCalc;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -39,12 +42,13 @@ import 
org.apache.calcite.rel.rules.ProjectSetOpTransposeRule;
 import org.apache.calcite.rel.rules.ProjectToWindowRule;
 import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
 import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SemiJoinRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.rel.rules.SubQueryRemoveRule;
 import org.apache.calcite.rel.rules.UnionToDistinctRule;
 import org.apache.drill.exec.planner.logical.DrillConditions;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 /**
  * Contains rule instances which use custom RelBuilder.
  */
@@ -58,6 +62,15 @@ public interface RuleInstance {
       new UnionToDistinctRule(LogicalUnion.class,
           DrillRelFactories.LOGICAL_BUILDER);
 
+  SemiJoinRule SEMI_JOIN_PROJECT_RULE = new 
SemiJoinRule.ProjectToSemiJoinRule(Project.class, Join.class, Aggregate.class,
+          DrillRelFactories.LOGICAL_BUILDER, "DrillSemiJoinRule:project") {
+    public boolean matches(RelOptRuleCall call) {
+      Preconditions.checkArgument(call.rel(1) instanceof Join);
+      Join join = call.rel(1);
+      return !(join.getCondition().isAlwaysTrue() || 
join.getCondition().isAlwaysFalse());
+    }
+  };
+
   JoinPushExpressionsRule JOIN_PUSH_EXPRESSIONS_RULE =
       new JoinPushExpressionsRule(Join.class,
           DrillRelFactories.LOGICAL_BUILDER);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
index 434016f..cde49e4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -45,7 +46,7 @@ import 
org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 /**
  * Base class for logical and physical Joins implemented in Drill.
  */
-public abstract class DrillJoinRelBase extends Join implements DrillRelNode {
+public abstract class DrillJoinRelBase extends Join implements DrillJoin {
   protected List<Integer> leftKeys = Lists.newArrayList();
   protected List<Integer> rightKeys = Lists.newArrayList();
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
new file mode 100644
index 0000000..30067da
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillRelNode;
+import java.util.List;
+
+/**
+ * Interface which needs to be implemented by all the join relation 
expressions.
+ */
+public interface DrillJoin extends DrillRelNode {
+
+  /* Columns of left table that are part of join condition */
+  List<Integer> getLeftKeys();
+
+  /* Columns of right table that are part of join condition */
+  List<Integer> getRightKeys();
+
+  /* JoinType of the join operation*/
+  JoinRelType getJoinType();
+
+  /* Join condition of the join relation */
+  RexNode getCondition();
+
+  /* Left RelNode of the Join Relation */
+  RelNode getLeft();
+
+  /* Right RelNode of the Join Relation */
+  RelNode getRight();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index 42f7e72..0126e74 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -104,7 +104,7 @@ public class DrillJoinRel extends DrillJoinRelBase 
implements DrillRel {
    * @return
    */
   private LogicalOperator implementInput(DrillImplementor implementor, int i, 
int offset, RelNode input) {
-    return implementInput(implementor, i, offset, input, this);
+    return implementInput(implementor, i, offset, input, this, 
this.getRowType().getFieldNames());
   }
 
   /**
@@ -118,12 +118,12 @@ public class DrillJoinRel extends DrillJoinRelBase 
implements DrillRel {
    * @return
    */
   public static LogicalOperator implementInput(DrillImplementor implementor, 
int i, int offset,
-                                                RelNode input, DrillRel 
currentNode) {
+                                               RelNode input, DrillRel 
currentNode,
+                                               List<String> parentFields) {
     final LogicalOperator inputOp = implementor.visitChild(currentNode, i, 
input);
     assert uniqueFieldNames(input.getRowType());
-    final List<String> fields = currentNode.getRowType().getFieldNames();
     final List<String> inputFields = input.getRowType().getFieldNames();
-    final List<String> outputFields = fields.subList(offset, offset + 
inputFields.size());
+    final List<String> outputFields = parentFields.subList(offset, offset + 
inputFields.size());
     if (!outputFields.equals(inputFields)) {
       // Ensure that input field names are the same as output field names.
       // If there are duplicate field names on left and right, fields will get
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
index 4356d49..ca03de1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.logical.data.LateralJoin;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
 
+import java.util.ArrayList;
 import java.util.List;
 
 
@@ -48,12 +49,14 @@ public class DrillLateralJoinRel extends 
DrillLateralJoinRelBase implements Dril
 
   @Override
   public LogicalOperator implement(DrillImplementor implementor) {
-    final List<String> fields = getRowType().getFieldNames();
+    List<String> fields = new ArrayList<>();
+    fields.addAll(getInput(0).getRowType().getFieldNames());
+    fields.addAll(getInput(1).getRowType().getFieldNames());
     assert DrillJoinRel.isUnique(fields);
     final int leftCount = getInputSize(0);
 
-    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 
0, left, this);
-    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 
1, leftCount, right, this);
+    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 
0, left, this, fields);
+    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 
1, leftCount, right, this, fields);
 
     return new LateralJoin(leftOp, rightOp);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
index d5ff56b..a0b727d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.type.RelDataType;
@@ -39,7 +40,6 @@ import static 
org.apache.calcite.rel.core.RelFactories.DEFAULT_FILTER_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_JOIN_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_MATCH_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_PROJECT_FACTORY;
-import static 
org.apache.calcite.rel.core.RelFactories.DEFAULT_SEMI_JOIN_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SET_OP_FACTORY;
 import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SORT_FACTORY;
 import static 
org.apache.calcite.rel.core.RelFactories.DEFAULT_TABLE_SCAN_FACTORY;
@@ -60,6 +60,17 @@ public class DrillRelFactories {
   public static final RelFactories.JoinFactory DRILL_LOGICAL_JOIN_FACTORY = 
new DrillJoinFactoryImpl();
 
   public static final RelFactories.AggregateFactory 
DRILL_LOGICAL_AGGREGATE_FACTORY = new DrillAggregateFactoryImpl();
+
+  public static final RelFactories.SemiJoinFactory DRILL_SEMI_JOIN_FACTORY = 
new SemiJoinFactoryImpl();
+
+  private static class SemiJoinFactoryImpl implements 
RelFactories.SemiJoinFactory {
+    public RelNode createSemiJoin(RelNode left, RelNode right,
+                                  RexNode condition) {
+      final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+      return DrillSemiJoinRel.create(left, right,
+              condition, joinInfo.leftKeys, joinInfo.rightKeys);
+    }
+  }
   /**
    * A {@link RelBuilderFactory} that creates a {@link DrillRelBuilder} that 
will
    * create logical relational expressions for everything.
@@ -69,7 +80,7 @@ public class DrillRelFactories {
           Contexts.of(DEFAULT_PROJECT_FACTORY,
               DEFAULT_FILTER_FACTORY,
               DEFAULT_JOIN_FACTORY,
-              DEFAULT_SEMI_JOIN_FACTORY,
+              DRILL_SEMI_JOIN_FACTORY,
               DEFAULT_SORT_FACTORY,
               DEFAULT_AGGREGATE_FACTORY,
               DEFAULT_MATCH_FACTORY,
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
new file mode 100644
index 0000000..09e4be9
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.LogicalSemiJoin;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel {
+
+  public DrillSemiJoinRel(
+          RelOptCluster cluster,
+          RelTraitSet traitSet,
+          RelNode left,
+          RelNode right,
+          RexNode condition,
+          ImmutableIntList leftKeys,
+          ImmutableIntList rightKeys) {
+    super(cluster,
+          traitSet,
+          left,
+          right,
+          condition,
+          leftKeys,
+          rightKeys);
+  }
+
+  public static SemiJoin create(RelNode left, RelNode right, RexNode condition,
+                                ImmutableIntList leftKeys, ImmutableIntList 
rightKeys) {
+    final RelOptCluster cluster = left.getCluster();
+    return new DrillSemiJoinRel(cluster, 
cluster.traitSetOf(DrillRel.DRILL_LOGICAL), left,
+            right, condition, leftKeys, rightKeys);
+  }
+
+  @Override
+  public SemiJoin copy(RelTraitSet traitSet, RexNode condition,
+                                 RelNode left, RelNode right, JoinRelType 
joinType, boolean semiJoinDone) {
+    Preconditions.checkArgument(joinType == JoinRelType.INNER);
+    final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+    Preconditions.checkArgument(joinInfo.isEqui());
+    return new DrillSemiJoinRel(getCluster(), traitSet, left, right, condition,
+            joinInfo.leftKeys, joinInfo.rightKeys);
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    List<String> fields = new ArrayList<>();
+    fields.addAll(getInput(0).getRowType().getFieldNames());
+    fields.addAll(getInput(1).getRowType().getFieldNames());
+    Preconditions.checkArgument(DrillJoinRel.isUnique(fields));
+    final int leftCount = left.getRowType().getFieldCount();
+    final List<String> leftFields = fields.subList(0, leftCount);
+    final List<String> rightFields = fields.subList(leftCount, leftCount + 
right.getRowType().getFieldCount());
+
+    final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 
0, left, this, fields);
+    final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 
1, leftCount, right, this, fields);
+
+    Join.Builder builder = Join.builder();
+    builder.type(joinType);
+    builder.left(leftOp);
+    builder.right(rightOp);
+    List<JoinCondition> conditions = Lists.newArrayList();
+    for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
+      conditions.add(new JoinCondition(DrillJoinRel.EQUALITY_CONDITION,
+              new FieldReference(leftFields.get(pair.left)), new 
FieldReference(rightFields.get(pair.right))));
+    }
+
+    return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 0e1fc4e..6480f3d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -20,8 +20,13 @@ package org.apache.drill.exec.planner.physical;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.common.logical.data.JoinCondition;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -38,7 +43,7 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 
@@ -50,14 +55,25 @@ public class HashJoinPrel  extends JoinPrel {
   private int joinControl;
 
   public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
-                      JoinRelType joinType) throws InvalidRelException {
-    this(cluster, traits, left, right, condition, joinType, false, null, 
false, JoinControl.DEFAULT);
+                      JoinRelType joinType, boolean semiJoin) throws 
InvalidRelException {
+    this(cluster, traits, left, right, condition, joinType, false, null, 
false, JoinControl.DEFAULT, semiJoin);
+  }
+
+  public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
+                      JoinRelType joinType, boolean swapped, RuntimeFilterDef 
runtimeFilterDef,
+                      boolean isRowKeyJoin, int joinControl) throws 
InvalidRelException {
+    this(cluster, traits, left, right, condition, joinType, swapped, 
runtimeFilterDef, isRowKeyJoin, joinControl, false);
   }
 
   public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
       JoinRelType joinType, boolean swapped, RuntimeFilterDef runtimeFilterDef,
-      boolean isRowKeyJoin, int joinControl) throws InvalidRelException {
-    super(cluster, traits, left, right, condition, joinType);
+      boolean isRowKeyJoin, int joinControl, boolean semiJoin) throws 
InvalidRelException {
+    super(cluster, traits, left, right, condition, joinType, semiJoin);
+    Preconditions.checkArgument(isSemiJoin && !swapped || swapped && 
!isSemiJoin || (!swapped && !isSemiJoin));
+    if (isSemiJoin) {
+      Preconditions.checkArgument(!swapped, "swapping of inputs is not allowed 
for semi-joins");
+      Preconditions.checkArgument(validateTraits(traitSet, left, right));
+    }
     this.swapped = swapped;
     this.isRowKeyJoin = isRowKeyJoin;
     joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, 
rightKeys, filterNulls);
@@ -65,11 +81,34 @@ public class HashJoinPrel  extends JoinPrel {
     this.joinControl = joinControl;
   }
 
+  private static boolean validateTraits(RelTraitSet traitSet, RelNode left, 
RelNode right) {
+    ImmutableBitSet bitSet = 
ImmutableBitSet.range(left.getRowType().getFieldCount(),
+            left.getRowType().getFieldCount() + 
right.getRowType().getFieldCount());
+    for (RelTrait trait: traitSet) {
+      if (trait.getTraitDef().getTraitClass().equals(RelCollation.class)) {
+        RelCollation collationTrait = (RelCollation)trait;
+        for (RelFieldCollation field : collationTrait.getFieldCollations()) {
+          if (bitSet.indexOf(field.getFieldIndex()) > 0) {
+            return false;
+          }
+        }
+      } else if 
(trait.getTraitDef().getTraitClass().equals(DrillDistributionTrait.class)) {
+        DrillDistributionTrait distributionTrait = (DrillDistributionTrait) 
trait;
+        for (DrillDistributionTrait.DistributionField field : 
distributionTrait.getFields()) {
+          if (bitSet.indexOf(field.getFieldId()) > 0) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
   @Override
   public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, 
RelNode right, JoinRelType joinType, boolean semiJoinDone) {
     try {
       return new HashJoinPrel(this.getCluster(), traitSet, left, right, 
conditionExpr, joinType, this.swapped, this.runtimeFilterDef,
-          this.isRowKeyJoin, this.joinControl);
+          this.isRowKeyJoin, this.joinControl, this.isSemiJoin);
     }catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
@@ -87,7 +126,7 @@ public class HashJoinPrel  extends JoinPrel {
   }
 
   @Override
-  public org.apache.drill.exec.physical.base.PhysicalOperator 
getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
     // Depending on whether the left/right is swapped for hash inner join, 
pass in different
     // combinations of parameters.
     if (! swapped) {
@@ -150,4 +189,8 @@ public class HashJoinPrel  extends JoinPrel {
     return this.isRowKeyJoin;
   }
 
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("semi-join: ", isSemiJoin);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index d07cf51..0d7f5ca 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillSemiJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
@@ -30,10 +32,14 @@ import org.slf4j.Logger;
 public class HashJoinPrule extends JoinPruleBase {
   public static final RelOptRule DIST_INSTANCE = new 
HashJoinPrule("Prel.HashJoinDistPrule", RelOptHelper.any(DrillJoinRel.class), 
true);
   public static final RelOptRule BROADCAST_INSTANCE = new 
HashJoinPrule("Prel.HashJoinBroadcastPrule", 
RelOptHelper.any(DrillJoinRel.class), false);
+  public static final RelOptRule SEMI_DIST_INSTANCE = new 
HashJoinPrule("Prel.HashSemiJoinDistPrule", 
RelOptHelper.any(DrillSemiJoinRel.class), true);
+  public static final RelOptRule SEMI_BROADCAST_INSTANCE = new 
HashJoinPrule("Prel.HashSemiJoinBroadcastPrule", 
RelOptHelper.any(DrillSemiJoinRel.class), false);
+
 
   protected static final Logger tracer = CalciteTrace.getPlannerTracer();
 
   private final boolean isDist;
+  private boolean isSemi = false;
   private HashJoinPrule(String name, RelOptRuleOperand operand, boolean 
isDist) {
     super(operand, name);
     this.isDist = isDist;
@@ -42,17 +48,18 @@ public class HashJoinPrule extends JoinPruleBase {
   @Override
   public boolean matches(RelOptRuleCall call) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    isSemi = call.rel(0) instanceof DrillSemiJoinRel;
     return settings.isMemoryEstimationEnabled() || 
settings.isHashJoinEnabled();
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
-    if (!settings.isHashJoinEnabled()) {
+    if (!settings.isHashJoinEnabled() || isSemi && 
!settings.isSemiJoinEnabled()) {
       return;
     }
 
-    final DrillJoinRel join = call.rel(0);
+    final DrillJoin join = call.rel(0);
     final RelNode left = join.getLeft();
     final RelNode right = join.getRight();
 
@@ -66,11 +73,11 @@ public class HashJoinPrule extends JoinPruleBase {
 
       if(isDist){
         createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN,
-            left, right, null /* left collation */, null /* right collation 
*/, hashSingleKey);
+            left, right, null /* left collation */, null /* right collation 
*/, hashSingleKey, isSemi);
       }else{
         if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
           createBroadcastPlan(call, join, join.getCondition(), 
PhysicalJoinType.HASH_JOIN,
-              left, right, null /* left collation */, null /* right collation 
*/);
+              left, right, null /* left collation */, null /* right collation 
*/, isSemi);
         }
       }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index c40eeaa..2581fa6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -17,9 +17,14 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.calcite.rex.RexChecker;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Litmus;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
@@ -37,7 +42,6 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.Pair;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
@@ -48,11 +52,18 @@ import 
org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 public abstract class JoinPrel extends DrillJoinRelBase implements Prel {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JoinPrel.class);
 
+  protected final boolean isSemiJoin;
   protected JoinUtils.JoinCategory joincategory;
 
   public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
-      JoinRelType joinType) {
+                  JoinRelType joinType) {
+    this(cluster, traits, left, right, condition, joinType, false);
+  }
+
+  public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right, RexNode condition,
+      JoinRelType joinType, boolean isSemiJoin) {
     super(cluster, traits, left, right, condition, joinType);
+    this.isSemiJoin = isSemiJoin;
   }
 
   @Override
@@ -73,7 +84,12 @@ public abstract class JoinPrel extends DrillJoinRelBase 
implements Prel {
     assert uniqueFieldNames(input.getRowType());
     final List<String> fields = getRowType().getFieldNames();
     final List<String> inputFields = input.getRowType().getFieldNames();
-    final List<String> outputFields = fields.subList(offset, offset + 
inputFields.size());
+    final List<String> outputFields;
+    if (fields.size() > offset) {
+      outputFields = fields.subList(offset, offset + inputFields.size());
+    } else {
+      outputFields = new ArrayList<>();
+    }
     if (!outputFields.equals(inputFields)) {
       // Ensure that input field names are the same as output field names.
       // If there are duplicate field names on left and right, fields will get
@@ -86,6 +102,9 @@ public abstract class JoinPrel extends DrillJoinRelBase 
implements Prel {
   }
 
   private RelNode rename(RelNode input, List<RelDataTypeField> inputFields, 
List<String> outputFieldNames) {
+    if (outputFieldNames.size() == 0) {
+      return input;
+    }
     List<RexNode> exprs = Lists.newArrayList();
 
     for (RelDataTypeField field : inputFields) {
@@ -139,4 +158,62 @@ public abstract class JoinPrel extends DrillJoinRelBase 
implements Prel {
     }
   }
 
+  public boolean isSemiJoin() {
+    return isSemiJoin;
+  }
+
+  /* A Drill physical rel which is semi join will have output row type with 
fields from only
+     left side of the join. Calcite's join rel expects to have the output row 
type from
+     left and right side of the join. This function is overloaded to not throw 
exceptions for
+     a Drill semi join physical rel.
+   */
+  @Override public boolean isValid(Litmus litmus, Context context) {
+    if (!this.isSemiJoin && !super.isValid(litmus, context)) {
+      return false;
+    }
+    if (getRowType().getFieldCount()
+            != getSystemFieldList().size()
+            + left.getRowType().getFieldCount()
+            + (this.isSemiJoin ? 0 : right.getRowType().getFieldCount())) {
+      return litmus.fail("field count mismatch");
+    }
+    if (condition != null) {
+      if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
+        return litmus.fail("condition must be boolean: {}",
+                condition.getType());
+      }
+      // The input to the condition is a row type consisting of system
+      // fields, left fields, and right fields. Very similar to the
+      // output row type, except that fields have not yet been made due
+      // due to outer joins.
+      RexChecker checker =
+              new RexChecker(
+                      getCluster().getTypeFactory().builder()
+                              .addAll(getSystemFieldList())
+                              .addAll(getLeft().getRowType().getFieldList())
+                              .addAll(getRight().getRowType().getFieldList())
+                              .build(),
+                      context, litmus);
+      condition.accept(checker);
+      if (checker.getFailureCount() > 0) {
+        return litmus.fail(checker.getFailureCount()
+                + " failures in condition " + condition);
+      }
+    }
+    return litmus.succeed();
+  }
+
+  @Override public RelDataType deriveRowType() {
+    if (isSemiJoin) {
+      return SqlValidatorUtil.deriveJoinRowType(
+              left.getRowType(),
+              null,
+              this.joinType,
+              getCluster().getTypeFactory(),
+              null,
+              new ArrayList<>());
+    } else {
+      return super.deriveRowType();
+    }
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 7588e2c..3665401 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -22,7 +22,7 @@ import java.util.List;
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
-import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -47,8 +47,8 @@ public abstract class JoinPruleBase extends Prule {
     super(operand, description);
   }
 
-  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, 
RelNode right,
-      PlannerSettings settings) {
+  protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode 
right,
+                                       PlannerSettings settings) {
     List<Integer> leftKeys = Lists.newArrayList();
     List<Integer> rightKeys = Lists.newArrayList();
     List<Boolean> filterNulls = Lists.newArrayList();
@@ -66,7 +66,7 @@ public abstract class JoinPruleBase extends Prule {
     return distFields;
   }
 
-  protected boolean checkBroadcastConditions(RelOptPlanner planner, 
DrillJoinRel join, RelNode left, RelNode right) {
+  protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoin 
join, RelNode left, RelNode right) {
 
     double estimatedRightRowCount = 
RelMetadataQuery.instance().getRowCount(right);
     if (estimatedRightRowCount < 
PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
@@ -78,10 +78,11 @@ public abstract class JoinPruleBase extends Prule {
     return false;
   }
 
-  protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+  protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
-      RelCollation collationLeft, RelCollation collationRight, boolean 
hashSingleKey)throws InvalidRelException {
+      RelCollation collationLeft, RelCollation collationRight,
+      boolean hashSingleKey, boolean semiJoin)throws InvalidRelException {
 
     /* If join keys are  l1 = r1 and l2 = r2 and ... l_k = r_k, then consider 
the following options of plan:
      *   1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, 
r2, ..., r_k) for right side.
@@ -93,10 +94,12 @@ public abstract class JoinPruleBase extends Prule {
      *   Whether enumerate plan 2, .., Plan_(k+1) depends on option : 
hashSingleKey.
      */
 
-    DrillDistributionTrait hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
-    DrillDistributionTrait hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
+    DrillDistributionTrait hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+            ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
+    DrillDistributionTrait hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+            ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
 
-    createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition);
+    createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
 
     assert (join.getLeftKeys().size() == join.getRightKeys().size());
 
@@ -110,7 +113,7 @@ public abstract class JoinPruleBase extends Prule {
         hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, 
i+1))));
         hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, 
i+1))));
 
-        createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition);
+        createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
       }
     }
   }
@@ -118,11 +121,11 @@ public abstract class JoinPruleBase extends Prule {
   // Create join plan with both left and right children hash distributed. If 
the physical join type
   // is MergeJoin, a collation must be provided for both left and right child 
and the plan will contain
   // sort converter if necessary to provide the collation.
-  private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+  private void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
       RelCollation collationLeft, RelCollation collationRight,
-      DrillDistributionTrait hashLeftPartition, DrillDistributionTrait 
hashRightPartition) throws InvalidRelException {
+      DrillDistributionTrait hashLeftPartition, DrillDistributionTrait 
hashRightPartition, boolean isSemiJoin) throws InvalidRelException {
 
     RelTraitSet traitsLeft = null;
     RelTraitSet traitsRight = null;
@@ -145,7 +148,7 @@ public abstract class JoinPruleBase extends Prule {
       final RelTraitSet traitSet = PrelUtil.removeCollation(traitsLeft, call);
       newJoin = new HashJoinPrel(join.getCluster(), traitSet,
                                  convertedLeft, convertedRight, 
join.getCondition(),
-                                 join.getJoinType());
+                                 join.getJoinType(), isSemiJoin);
 
     } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
       newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft,
@@ -158,11 +161,11 @@ public abstract class JoinPruleBase extends Prule {
   // Create join plan with left child ANY distributed and right child 
BROADCAST distributed. If the physical join type
   // is MergeJoin, a collation must be provided for both left and right child 
and the plan will contain sort converter
   // if necessary to provide the collation.
-  protected void createBroadcastPlan(final RelOptRuleCall call, final 
DrillJoinRel join,
+  protected void createBroadcastPlan(final RelOptRuleCall call, final 
DrillJoin join,
       final RexNode joinCondition,
       final PhysicalJoinType physicalJoinType,
       final RelNode left, final RelNode right,
-      final RelCollation collationLeft, final RelCollation collationRight) 
throws InvalidRelException {
+      final RelCollation collationLeft, final RelCollation collationRight, 
boolean semiJoin) throws InvalidRelException {
 
     DrillDistributionTrait distBroadcastRight = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
     RelTraitSet traitsRight = null;
@@ -184,10 +187,10 @@ public abstract class JoinPruleBase extends Prule {
 
     if(traitProp){
       if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join, final RelNode 
rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join, final RelNode rel) 
throws InvalidRelException {
             DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, 
collationLeft, toDist);
 
@@ -200,24 +203,24 @@ public abstract class JoinPruleBase extends Prule {
 
 
       } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join,  final RelNode 
rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join,  final RelNode 
rel) throws InvalidRelException {
             DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, 
toDist);
             RelNode newLeft = convert(left, newTraitsLeft);
             return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, 
convertedRight, joinCondition,
-                                         join.getJoinType());
+                                         join.getJoinType(), semiJoin);
 
           }
 
         }.go(join, convertedLeft);
       } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
-        new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+        new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
 
           @Override
-          public RelNode convertChild(final DrillJoinRel join,  final RelNode 
rel) throws InvalidRelException {
+          public RelNode convertChild(final DrillJoin join,  final RelNode 
rel) throws InvalidRelException {
             DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, 
toDist);
             RelNode newLeft = convert(left, newTraitsLeft);
@@ -235,7 +238,7 @@ public abstract class JoinPruleBase extends Prule {
       } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
         final RelTraitSet traitSet = 
PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
         call.transformTo(new HashJoinPrel(join.getCluster(), traitSet, 
convertedLeft,
-            convertedRight, joinCondition, join.getJoinType()));
+            convertedRight, joinCondition, join.getJoinType(), semiJoin));
       } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
         call.transformTo(new NestedLoopJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft,
             convertedRight, joinCondition, join.getJoinType()));
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index f06b66d..0bd2568 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -70,11 +70,11 @@ public class MergeJoinPrule extends JoinPruleBase {
       RelCollation collationRight = getCollation(join.getRightKeys());
 
       if(isDist){
-        createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, 
right, collationLeft, collationRight, hashSingleKey);
+        createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, 
right, collationLeft, collationRight, hashSingleKey, false);
       }else{
         if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
           createBroadcastPlan(call, join, join.getCondition(), 
PhysicalJoinType.MERGE_JOIN,
-              left, right, collationLeft, collationRight);
+              left, right, collationLeft, collationRight, false);
         }
       }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
index 848c8a1..e7fc032 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
+import org.apache.drill.exec.planner.logical.DrillJoin;
 import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.calcite.rel.InvalidRelException;
@@ -45,8 +46,8 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
   }
 
   @Override
-  protected boolean checkPreconditions(DrillJoinRel join, RelNode left, 
RelNode right,
-      PlannerSettings settings) {
+  protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode 
right,
+                                       PlannerSettings settings) {
     JoinRelType type = join.getJoinType();
 
     if (!(type == JoinRelType.INNER || type == JoinRelType.LEFT)) {
@@ -93,7 +94,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
 
       if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
         createBroadcastPlan(call, join, join.getCondition(), 
PhysicalJoinType.NESTEDLOOP_JOIN,
-            left, right, null /* left collation */, null /* right collation 
*/);
+            left, right, null /* left collation */, null /* right collation 
*/, false);
       }
 
     } catch (InvalidRelException e) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 63f884c..7577cf9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -64,6 +64,8 @@ public class PlannerSettings implements Context{
       new OptionDescription("Generates the topN plan for queries with the 
ORDER BY and LIMIT clauses."));
   public static final OptionValidator HASHJOIN = new 
BooleanValidator("planner.enable_hashjoin",
       new OptionDescription("Enable the memory hungry hash join. Drill assumes 
that a query will have adequate memory to complete and tries to use the fastest 
operations possible to complete the planned inner, left, right, or full outer 
joins using a hash table. Does not write to disk. Disabling hash join allows 
Drill to manage arbitrarily large data in a small memory footprint."));
+  public static final OptionValidator SEMIJOIN = new 
BooleanValidator("planner.enable_semijoin",
+          new OptionDescription("Enable the semi join optimization. Planner 
removes the distinct processing below the hash join and sets the semi join flag 
in hash join."));
   public static final OptionValidator MERGEJOIN = new 
BooleanValidator("planner.enable_mergejoin",
       new OptionDescription("Sort-based operation. A merge join is used for 
inner join, left and right outer joins. Inputs to the merge join must be 
sorted. It reads the sorted input streams from both sides and finds matching 
rows. Writes to disk."));
   public static final OptionValidator NESTEDLOOPJOIN = new 
BooleanValidator("planner.enable_nestedloopjoin",
@@ -273,6 +275,10 @@ public class PlannerSettings implements Context{
     return options.getOption(HASHJOIN.getOptionName()).bool_val;
   }
 
+  public boolean isSemiJoinEnabled() {
+    return options.getOption(SEMIJOIN.getOptionName()).bool_val;
+  }
+
   public boolean isMergeJoinEnabled() {
     return options.getOption(MERGEJOIN.getOptionName()).bool_val;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
index b7bc4bb..0fe0f92 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
@@ -68,7 +68,7 @@ public class SwapHashJoinVisitor extends 
BasePrelVisitor<Prel, Double, RuntimeEx
       // Mark left/right is swapped, when INNER hash join's left row count < ( 
1+ margin factor) right row count.
       RelMetadataQuery mq = newJoin.getCluster().getMetadataQuery();
       if (newJoin.getLeft().estimateRowCount(mq) < (1 + value) * 
newJoin.getRight().estimateRowCount(mq) &&
-          newJoin.getJoinType() == JoinRelType.INNER) {
+          newJoin.getJoinType() == JoinRelType.INNER && !newJoin.isSemiJoin()) 
{
         ((HashJoinPrel) newJoin).setSwapped(true);
       }
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 23f35b5..a33d832 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -81,6 +81,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new OptionDefinition(PlannerSettings.STREAMAGG),
       new OptionDefinition(PlannerSettings.TOPN, new 
OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
       new OptionDefinition(PlannerSettings.HASHJOIN),
+      new OptionDefinition(PlannerSettings.SEMIJOIN),
       new OptionDefinition(PlannerSettings.MERGEJOIN),
       new OptionDefinition(PlannerSettings.NESTEDLOOPJOIN),
       new OptionDefinition(PlannerSettings.MULTIPHASE),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 76be050..f083c66 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -514,6 +514,7 @@ drill.exec.options: {
     planner.enable_hash_single_key: true,
     planner.enable_hashagg: true,
     planner.enable_hashjoin: true,
+    planner.enable_semijoin: false,
     planner.enable_hashjoin_swap: true,
     planner.enable_hep_opt: true,
     planner.enable_hep_partition_pruning: true,
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
new file mode 100644
index 0000000..a660fff
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import org.junit.experimental.categories.Category;
+
+@Category({SlowTest.class, OperatorTest.class})
+public class TestSemiJoin extends BaseTestQuery {
+  @Test
+  public void testInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` where 
employee_id in (select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testInClauseWithSemiJoinDisabled() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` where 
employee_id in (select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), false);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(!queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testSmallInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` " +
+            "where employee_id in (351, 352, 353, 451, 452, 453)";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(!queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testLargeInClauseToSemiJoin() throws Exception {
+    String sql = "select employee_id, full_name from cp.`employee.json` " +
+            "where employee_id in (351, 352, 353, 451, 452, 453, 551, 552, 
553, 651, 652, 653, 751, 752, 753, 851, 852, 853, 951, 952, 953)";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testStarWithInClauseToSemiJoin() throws Exception {
+    String sql = "select * from cp.`employee.json` where employee_id in 
(select employee_id from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+
+  @Test
+  public void testMultiColumnInClauseWithSemiJoin() throws Exception {
+    String sql = "select * from cp.`employee.json` where (employee_id, 
full_name) in (select employee_id, full_name from cp.`employee.json` )";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String queryPlan = client.queryBuilder().sql(sql).explainText();
+      assertTrue(queryPlan.contains("semi-join: =[true]"));
+    }
+  }
+}
diff --git 
a/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java
 
b/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java
new file mode 100644
index 0000000..a44ec9f
--- /dev/null
+++ 
b/logical/src/main/java/org/apache/drill/common/logical/data/LogicalSemiJoin.java
@@ -0,0 +1,52 @@
+package org.apache.drill.common.logical.data;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("semi-join")
+public class LogicalSemiJoin extends Join {
+
+  @JsonCreator
+  public LogicalSemiJoin(@JsonProperty("left") LogicalOperator left,
+                         @JsonProperty("right") LogicalOperator right,
+                         @JsonProperty("conditions") List<JoinCondition> 
conditions,
+                         @JsonProperty("type") JoinRelType type) {
+    super(left, right, conditions, type);
+  }
+
+
+  @Override
+  public Iterator<LogicalOperator> iterator() {
+    return Iterators.forArray(getLeft(), getRight());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitJoin(this, value);
+  }
+}

Reply via email to