This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cd3b2cb  [BEAM-6114] Calcite Rules to Select Type of Join in BeamSQL
     new 1efa67a  Merge pull request #9395 from rahul8383/beamsql-join-rules
cd3b2cb is described below

commit cd3b2cb97e096ef38c3bb9ccd51165ac7b2ad841
Author: parahul <[email protected]>
AuthorDate: Thu Aug 22 00:23:53 2019 +0530

    [BEAM-6114] Calcite Rules to Select Type of Join in BeamSQL
---
 .../extensions/sql/impl/planner/BeamRuleSets.java  |   8 +-
 .../extensions/sql/impl/rel/BeamCoGBKJoinRel.java  | 193 ++++++++++
 .../sdk/extensions/sql/impl/rel/BeamJoinRel.java   | 399 ++++-----------------
 .../sql/impl/rel/BeamSideInputJoinRel.java         | 157 ++++++++
 .../sql/impl/rel/BeamSideInputLookupJoinRel.java   | 110 ++++++
 .../sql/impl/rule/BeamCoGBKJoinRule.java           |  82 +++++
 .../sql/impl/rule/BeamSideInputJoinRule.java       |  82 +++++
 ...nRule.java => BeamSideInputLookupJoinRule.java} |  32 +-
 .../sdk/extensions/sql/BeamSqlDslJoinTest.java     |   4 +-
 .../sdk/extensions/sql/BeamSqlExplainTest.java     |   2 +-
 ...a => BeamCoGBKJoinRelBoundedVsBoundedTest.java} |  64 +++-
 ... BeamCoGBKJoinRelUnboundedVsUnboundedTest.java} |  10 +-
 ...eamSideInputJoinRelUnboundedVsBoundedTest.java} |  14 +-
 13 files changed, 805 insertions(+), 352 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 1a78e98..b2766d6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -23,12 +23,14 @@ import 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamBasicAggregationRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCoGBKJoinRule;
 import 
org.apache.beam.sdk.extensions.sql.impl.rule.BeamEnumerableConverterRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIntersectRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinAssociateRule;
 import 
org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinPushThroughJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamMinusRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputJoinRule;
+import 
org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputLookupJoinRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSortRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUncollectRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnionRule;
@@ -151,7 +153,9 @@ public class BeamRuleSets {
           BeamUnionRule.INSTANCE,
           BeamUncollectRule.INSTANCE,
           BeamUnnestRule.INSTANCE,
-          BeamJoinRule.INSTANCE);
+          BeamSideInputJoinRule.INSTANCE,
+          BeamCoGBKJoinRule.INSTANCE,
+          BeamSideInputLookupJoinRule.INSTANCE);
 
   private static final List<RelOptRule> BEAM_TO_ENUMERABLE =
       ImmutableList.of(BeamEnumerableConverterRule.INSTANCE);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
new file mode 100644
index 0000000..0e165b6
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
+import static org.joda.time.Duration.ZERO;
+
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+
+public class BeamCoGBKJoinRel extends BeamJoinRel {
+
+  public BeamCoGBKJoinRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode left,
+      RelNode right,
+      RexNode condition,
+      Set<CorrelationId> variablesSet,
+      JoinRelType joinType) {
+    super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    return new StandardJoin();
+  }
+
+  private class StandardJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> pinput) {
+      Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
+      Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
+
+      PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new 
ExtractJoinKeys());
+
+      PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
+      PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
+
+      WindowFn leftWinFn = 
extractedLeftRows.getWindowingStrategy().getWindowFn();
+      WindowFn rightWinFn = 
extractedRightRows.getWindowingStrategy().getWindowFn();
+
+      try {
+        leftWinFn.verifyCompatibility(rightWinFn);
+      } catch (IncompatibleWindowException e) {
+        throw new IllegalArgumentException(
+            "WindowFns must match for a 
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
+      }
+
+      verifySupportedTrigger(extractedLeftRows);
+      verifySupportedTrigger(extractedRightRows);
+
+      return standardJoin(extractedLeftRows, extractedRightRows, leftSchema, 
rightSchema);
+    }
+  }
+
+  private <T> void verifySupportedTrigger(PCollection<T> pCollection) {
+    WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
+
+    if (UNBOUNDED.equals(pCollection.isBounded()) && 
!triggersOncePerWindow(windowingStrategy)) {
+      throw new UnsupportedOperationException(
+          "Joining unbounded PCollections is currently only supported for "
+              + "non-global windows with triggers that are known to produce 
output once per window,"
+              + "such as the default trigger with zero allowed lateness. "
+              + "In these cases Beam can guarantee it joins all input elements 
once per window. "
+              + windowingStrategy
+              + " is not supported");
+    }
+  }
+
+  private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) {
+    Trigger trigger = windowingStrategy.getTrigger();
+
+    return !(windowingStrategy.getWindowFn() instanceof GlobalWindows)
+        && trigger instanceof DefaultTrigger
+        && ZERO.equals(windowingStrategy.getAllowedLateness());
+  }
+
+  private PCollection<Row> standardJoin(
+      PCollection<KV<Row, Row>> extractedLeftRows,
+      PCollection<KV<Row, Row>> extractedRightRows,
+      Schema leftSchema,
+      Schema rightSchema) {
+    PCollection<KV<Row, KV<Row, Row>>> joinedRows = null;
+
+    switch (joinType) {
+      case LEFT:
+        {
+          Schema rigthNullSchema = buildNullSchema(rightSchema);
+          Row rightNullRow = Row.nullRow(rigthNullSchema);
+
+          extractedRightRows = setValueCoder(extractedRightRows, 
SchemaCoder.of(rigthNullSchema));
+
+          joinedRows =
+              org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(
+                  extractedLeftRows, extractedRightRows, rightNullRow);
+
+          break;
+        }
+      case RIGHT:
+        {
+          Schema leftNullSchema = buildNullSchema(leftSchema);
+          Row leftNullRow = Row.nullRow(leftNullSchema);
+
+          extractedLeftRows = setValueCoder(extractedLeftRows, 
SchemaCoder.of(leftNullSchema));
+
+          joinedRows =
+              org.apache.beam.sdk.extensions.joinlibrary.Join.rightOuterJoin(
+                  extractedLeftRows, extractedRightRows, leftNullRow);
+          break;
+        }
+      case FULL:
+        {
+          Schema leftNullSchema = buildNullSchema(leftSchema);
+          Schema rightNullSchema = buildNullSchema(rightSchema);
+
+          Row leftNullRow = Row.nullRow(leftNullSchema);
+          Row rightNullRow = Row.nullRow(rightNullSchema);
+
+          extractedLeftRows = setValueCoder(extractedLeftRows, 
SchemaCoder.of(leftNullSchema));
+          extractedRightRows = setValueCoder(extractedRightRows, 
SchemaCoder.of(rightNullSchema));
+
+          joinedRows =
+              org.apache.beam.sdk.extensions.joinlibrary.Join.fullOuterJoin(
+                  extractedLeftRows, extractedRightRows, leftNullRow, 
rightNullRow);
+          break;
+        }
+      case INNER:
+      default:
+        joinedRows =
+            org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(
+                extractedLeftRows, extractedRightRows);
+        break;
+    }
+
+    Schema schema = CalciteUtils.toSchema(getRowType());
+    return joinedRows
+        .apply(
+            "JoinParts2WholeRow",
+            MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow(schema)))
+        .setRowSchema(schema);
+  }
+
+  @Override
+  public Join copy(
+      RelTraitSet traitSet,
+      RexNode conditionExpr,
+      RelNode left,
+      RelNode right,
+      JoinRelType joinType,
+      boolean semiJoinDone) {
+    return new BeamCoGBKJoinRel(
+        getCluster(), traitSet, left, right, conditionExpr, variablesSet, 
joinType);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 8950974..f2739e4 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -18,13 +18,10 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import static org.apache.beam.sdk.schemas.Schema.toSchema;
-import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
-import static org.joda.time.Duration.ZERO;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.coders.Coder;
@@ -40,26 +37,18 @@ import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
@@ -102,9 +91,9 @@ import org.apache.calcite.util.Pair;
  *   <li>CROSS JOIN is not supported.
  * </ul>
  */
-public class BeamJoinRel extends Join implements BeamRelNode {
+public abstract class BeamJoinRel extends Join implements BeamRelNode {
 
-  public BeamJoinRel(
+  protected BeamJoinRel(
       RelOptCluster cluster,
       RelTraitSet traits,
       RelNode left,
@@ -116,18 +105,6 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
   }
 
   @Override
-  public Join copy(
-      RelTraitSet traitSet,
-      RexNode conditionExpr,
-      RelNode left,
-      RelNode right,
-      JoinRelType joinType,
-      boolean semiJoinDone) {
-    return new BeamJoinRel(
-        getCluster(), traitSet, left, right, conditionExpr, variablesSet, 
joinType);
-  }
-
-  @Override
   public List<RelNode> getPCollectionInputs() {
     if (isSideInputLookupJoin()) {
       return ImmutableList.of(
@@ -137,6 +114,38 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     }
   }
 
+  protected boolean isSideInputLookupJoin() {
+    return seekableInputIndex().isPresent() && 
nonSeekableInputIndex().isPresent();
+  }
+
+  protected Optional<Integer> seekableInputIndex() {
+    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+    return seekable(leftRelNode)
+        ? Optional.of(0)
+        : seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
+  }
+
+  protected Optional<Integer> nonSeekableInputIndex() {
+    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+    return !seekable(leftRelNode)
+        ? Optional.of(0)
+        : !seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
+  }
+
+  /** check if {@code BeamRelNode} implements {@code BeamSeekableTable}. */
+  public static boolean seekable(BeamRelNode relNode) {
+    if (relNode instanceof BeamIOSourceRel) {
+      BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
+      BeamSqlTable sourceTable = srcRel.getBeamSqlTable();
+      if (sourceTable instanceof BeamSqlSeekableTable) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
     NodeStats leftEstimates = BeamSqlRelUtils.getNodeStats(this.left, mq);
@@ -183,111 +192,7 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     return true;
   }
 
-  @Override
-  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
-    if (isSideInputLookupJoin()) {
-      return new SideInputLookupJoin();
-    } else if (isSideInputJoin()) {
-      // if one of the sides is Bounded & the other is Unbounded
-      // then do a sideInput join
-      // when doing a sideInput join, the windowFn does not need to match
-      // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join 
must be
-      // the unbounded
-      if (joinType == JoinRelType.FULL) {
-        throw new UnsupportedOperationException(
-            "FULL OUTER JOIN is not supported when join "
-                + "a bounded table with an unbounded table.");
-      }
-
-      BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-      BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-
-      if ((joinType == JoinRelType.LEFT && leftRelNode.isBounded() == 
PCollection.IsBounded.BOUNDED)
-          || (joinType == JoinRelType.RIGHT
-              && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED)) {
-        throw new UnsupportedOperationException(
-            "LEFT side of an OUTER JOIN must be Unbounded table.");
-      }
-
-      return new SideInputJoin();
-    } else {
-      return new StandardJoin();
-    }
-  }
-
-  private boolean isSideInputJoin() {
-    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-    return (leftRelNode.isBounded() == PCollection.IsBounded.BOUNDED
-            && rightRelNode.isBounded() == UNBOUNDED)
-        || (leftRelNode.isBounded() == UNBOUNDED
-            && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED);
-  }
-
-  private boolean isSideInputLookupJoin() {
-    return seekableInputIndex().isPresent() && 
nonSeekableInputIndex().isPresent();
-  }
-
-  private Optional<Integer> seekableInputIndex() {
-    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-    return seekable(leftRelNode)
-        ? Optional.of(0)
-        : seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
-  }
-
-  private Optional<Integer> nonSeekableInputIndex() {
-    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-    return !seekable(leftRelNode)
-        ? Optional.of(0)
-        : !seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
-  }
-
-  private class SideInputLookupJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
-
-    @Override
-    public PCollection<Row> expand(PCollectionList<Row> pinput) {
-      Schema schema = CalciteUtils.toSchema(getRowType());
-
-      BeamRelNode seekableRel =
-          
BeamSqlRelUtils.getBeamRelInput(getInput(seekableInputIndex().get()));
-      BeamRelNode nonSeekableRel =
-          
BeamSqlRelUtils.getBeamRelInput(getInput(nonSeekableInputIndex().get()));
-
-      // Offset field references according to which table is on the left
-      int factColOffset =
-          nonSeekableInputIndex().get() == 0
-              ? 0
-              : 
CalciteUtils.toSchema(seekableRel.getRowType()).getFieldCount();
-      int lkpColOffset =
-          seekableInputIndex().get() == 0
-              ? 0
-              : 
CalciteUtils.toSchema(nonSeekableRel.getRowType()).getFieldCount();
-
-      // HACK: if the input is an immediate instance of a seekable IO, we can 
do lookups
-      // so we ignore the PCollection
-      BeamIOSourceRel seekableInput = (BeamIOSourceRel) seekableRel;
-      BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable) 
seekableInput.getBeamSqlTable();
-
-      // getPCollectionInputs() ensures that there is only one and it is the 
non-seekable input
-      PCollection<Row> nonSeekableInput = pinput.get(0);
-
-      return nonSeekableInput
-          .apply(
-              "join_as_lookup",
-              new BeamJoinTransforms.JoinAsLookup(
-                  condition,
-                  seekableTable,
-                  CalciteUtils.toSchema(seekableInput.getRowType()),
-                  schema,
-                  factColOffset,
-                  lkpColOffset))
-          .setRowSchema(schema);
-    }
-  }
-
-  private class ExtractJoinKeys
+  protected class ExtractJoinKeys
       extends PTransform<PCollectionList<Row>, PCollectionList<KV<Row, Row>>> {
 
     @Override
@@ -348,189 +253,7 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     }
   }
 
-  private class SideInputJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
-
-    @Override
-    public PCollection<Row> expand(PCollectionList<Row> pinput) {
-      Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
-      Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
-
-      PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new 
ExtractJoinKeys());
-
-      PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
-      PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
-
-      return sideInputJoin(extractedLeftRows, extractedRightRows, leftSchema, 
rightSchema);
-    }
-  }
-
-  private class StandardJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
-
-    @Override
-    public PCollection<Row> expand(PCollectionList<Row> pinput) {
-      Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
-      Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
-
-      PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new 
ExtractJoinKeys());
-
-      PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
-      PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
-
-      WindowFn leftWinFn = 
extractedLeftRows.getWindowingStrategy().getWindowFn();
-      WindowFn rightWinFn = 
extractedRightRows.getWindowingStrategy().getWindowFn();
-
-      try {
-        leftWinFn.verifyCompatibility(rightWinFn);
-      } catch (IncompatibleWindowException e) {
-        throw new IllegalArgumentException(
-            "WindowFns must match for a 
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
-      }
-
-      verifySupportedTrigger(extractedLeftRows);
-      verifySupportedTrigger(extractedRightRows);
-
-      return standardJoin(extractedLeftRows, extractedRightRows, leftSchema, 
rightSchema);
-    }
-  }
-
-  private <T> void verifySupportedTrigger(PCollection<T> pCollection) {
-    WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
-
-    if (UNBOUNDED.equals(pCollection.isBounded()) && 
!triggersOncePerWindow(windowingStrategy)) {
-      throw new UnsupportedOperationException(
-          "Joining unbounded PCollections is currently only supported for "
-              + "non-global windows with triggers that are known to produce 
output once per window,"
-              + "such as the default trigger with zero allowed lateness. "
-              + "In these cases Beam can guarantee it joins all input elements 
once per window. "
-              + windowingStrategy
-              + " is not supported");
-    }
-  }
-
-  private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) {
-    Trigger trigger = windowingStrategy.getTrigger();
-
-    return !(windowingStrategy.getWindowFn() instanceof GlobalWindows)
-        && trigger instanceof DefaultTrigger
-        && ZERO.equals(windowingStrategy.getAllowedLateness());
-  }
-
-  private PCollection<Row> standardJoin(
-      PCollection<KV<Row, Row>> extractedLeftRows,
-      PCollection<KV<Row, Row>> extractedRightRows,
-      Schema leftSchema,
-      Schema rightSchema) {
-    PCollection<KV<Row, KV<Row, Row>>> joinedRows = null;
-
-    switch (joinType) {
-      case LEFT:
-        {
-          Schema rigthNullSchema = buildNullSchema(rightSchema);
-          Row rightNullRow = Row.nullRow(rigthNullSchema);
-
-          extractedRightRows = setValueCoder(extractedRightRows, 
SchemaCoder.of(rigthNullSchema));
-
-          joinedRows =
-              org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(
-                  extractedLeftRows, extractedRightRows, rightNullRow);
-
-          break;
-        }
-      case RIGHT:
-        {
-          Schema leftNullSchema = buildNullSchema(leftSchema);
-          Row leftNullRow = Row.nullRow(leftNullSchema);
-
-          extractedLeftRows = setValueCoder(extractedLeftRows, 
SchemaCoder.of(leftNullSchema));
-
-          joinedRows =
-              org.apache.beam.sdk.extensions.joinlibrary.Join.rightOuterJoin(
-                  extractedLeftRows, extractedRightRows, leftNullRow);
-          break;
-        }
-      case FULL:
-        {
-          Schema leftNullSchema = buildNullSchema(leftSchema);
-          Schema rightNullSchema = buildNullSchema(rightSchema);
-
-          Row leftNullRow = Row.nullRow(leftNullSchema);
-          Row rightNullRow = Row.nullRow(rightNullSchema);
-
-          extractedLeftRows = setValueCoder(extractedLeftRows, 
SchemaCoder.of(leftNullSchema));
-          extractedRightRows = setValueCoder(extractedRightRows, 
SchemaCoder.of(rightNullSchema));
-
-          joinedRows =
-              org.apache.beam.sdk.extensions.joinlibrary.Join.fullOuterJoin(
-                  extractedLeftRows, extractedRightRows, leftNullRow, 
rightNullRow);
-          break;
-        }
-      case INNER:
-      default:
-        joinedRows =
-            org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(
-                extractedLeftRows, extractedRightRows);
-        break;
-    }
-
-    Schema schema = CalciteUtils.toSchema(getRowType());
-    return joinedRows
-        .apply(
-            "JoinParts2WholeRow",
-            MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow(schema)))
-        .setRowSchema(schema);
-  }
-
-  public PCollection<Row> sideInputJoin(
-      PCollection<KV<Row, Row>> extractedLeftRows,
-      PCollection<KV<Row, Row>> extractedRightRows,
-      Schema leftSchema,
-      Schema rightSchema) {
-    // we always make the Unbounded table on the left to do the sideInput join
-    // (will convert the result accordingly before return)
-    boolean swapped = (extractedLeftRows.isBounded() == 
PCollection.IsBounded.BOUNDED);
-    JoinRelType realJoinType =
-        (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : 
joinType;
-
-    PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows : 
extractedLeftRows;
-    PCollection<KV<Row, Row>> realRightRows = swapped ? extractedLeftRows : 
extractedRightRows;
-
-    Row realRightNullRow;
-    if (swapped) {
-      Schema leftNullSchema = buildNullSchema(leftSchema);
-
-      realRightRows = setValueCoder(realRightRows, 
SchemaCoder.of(leftNullSchema));
-      realRightNullRow = Row.nullRow(leftNullSchema);
-    } else {
-      Schema rightNullSchema = buildNullSchema(rightSchema);
-
-      realRightRows = setValueCoder(realRightRows, 
SchemaCoder.of(rightNullSchema));
-      realRightNullRow = Row.nullRow(rightNullSchema);
-    }
-
-    // swapped still need to pass down because, we need to swap the result 
back.
-    return sideInputJoinHelper(
-        realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
-  }
-
-  private PCollection<Row> sideInputJoinHelper(
-      JoinRelType joinType,
-      PCollection<KV<Row, Row>> leftRows,
-      PCollection<KV<Row, Row>> rightRows,
-      Row rightNullRow,
-      boolean swapped) {
-    final PCollectionView<Map<Row, Iterable<Row>>> rowsView = 
rightRows.apply(View.asMultimap());
-
-    Schema schema = CalciteUtils.toSchema(getRowType());
-    return leftRows
-        .apply(
-            ParDo.of(
-                    new BeamJoinTransforms.SideInputJoinDoFn(
-                        joinType, rightNullRow, rowsView, swapped, schema))
-                .withSideInputs(rowsView))
-        .setRowSchema(schema);
-  }
-
-  private Schema buildNullSchema(Schema schema) {
+  protected Schema buildNullSchema(Schema schema) {
     Schema.Builder builder = Schema.builder();
 
     builder.addFields(
@@ -539,7 +262,7 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     return builder.build();
   }
 
-  private static <K, V> PCollection<KV<K, V>> setValueCoder(
+  protected static <K, V> PCollection<KV<K, V>> setValueCoder(
       PCollection<KV<K, V>> kvs, Coder<V> valueCoder) {
     // safe case because PCollection of KV always has KvCoder
     KvCoder<K, V> coder = (KvCoder<K, V>) kvs.getCoder();
@@ -651,15 +374,49 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     throw new UnsupportedOperationException("Cannot get column index from " + 
rexNode.getType());
   }
 
-  /** check if {@code BeamRelNode} implements {@code BeamSeekableTable}. */
-  private boolean seekable(BeamRelNode relNode) {
-    if (relNode instanceof BeamIOSourceRel) {
-      BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
-      BeamSqlTable sourceTable = srcRel.getBeamSqlTable();
-      if (sourceTable instanceof BeamSqlSeekableTable) {
+  // The Volcano planner works in a top-down fashion. It starts by transforming
+  // the root and move towards the leafs of the plan. Due to this when
+  // transforming a logical join its inputs are still in the logical 
convention.
+  // So, Recursively visit the inputs of the RelNode till BeamIOSourceRel is 
encountered and
+  // propagate the boundedness upwards.
+  public static PCollection.IsBounded getBoundednessOfRelNode(RelNode relNode) 
{
+    if (relNode instanceof BeamRelNode) {
+      return (((BeamRelNode) relNode).isBounded());
+    }
+    List<PCollection.IsBounded> boundednessOfInputs = new ArrayList<>();
+    for (RelNode inputRel : relNode.getInputs()) {
+      if (inputRel instanceof RelSubset) {
+        // Consider the RelNode with best cost in the RelSubset. If best cost 
RelNode cannot be
+        // determined, consider the first RelNode in the RelSubset(Is there a 
better way to do
+        // this?)
+        RelNode rel = ((RelSubset) inputRel).getBest();
+        if (rel == null) {
+          rel = ((RelSubset) inputRel).getRelList().get(0);
+        }
+        boundednessOfInputs.add(getBoundednessOfRelNode(rel));
+      } else {
+        boundednessOfInputs.add(getBoundednessOfRelNode(inputRel));
+      }
+    }
+    // If one of the input is Unbounded, the result is Unbounded.
+    return (boundednessOfInputs.contains(PCollection.IsBounded.UNBOUNDED)
+        ? PCollection.IsBounded.UNBOUNDED
+        : PCollection.IsBounded.BOUNDED);
+  }
+
+  public static boolean containsSeekableInput(RelNode relNode) {
+    for (RelNode relInput : relNode.getInputs()) {
+      if (relInput instanceof RelSubset) {
+        relInput = ((RelSubset) relInput).getBest();
+      }
+      // input is Seekable
+      if (relInput != null
+          && relInput instanceof BeamRelNode
+          && (BeamJoinRel.seekable((BeamRelNode) relInput))) {
         return true;
       }
     }
+    // None of the inputs are Seekable
     return false;
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
new file mode 100644
index 0000000..3e678e6
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+
+public class BeamSideInputJoinRel extends BeamJoinRel {
+
+  public BeamSideInputJoinRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode left,
+      RelNode right,
+      RexNode condition,
+      Set<CorrelationId> variablesSet,
+      JoinRelType joinType) {
+    super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+  }
+
+  @Override
+  public Join copy(
+      RelTraitSet traitSet,
+      RexNode conditionExpr,
+      RelNode left,
+      RelNode right,
+      JoinRelType joinType,
+      boolean semiJoinDone) {
+    return new BeamSideInputJoinRel(
+        getCluster(), traitSet, left, right, conditionExpr, variablesSet, 
joinType);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    // if one of the sides is Bounded & the other is Unbounded
+    // then do a sideInput join.
+    // When doing a sideInput join, the windowFn does not need to match.
+    // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join 
must be
+    // the unbounded & RIGHT OUTER JOIN where right side of the join must be 
the unbounded
+    if (joinType == JoinRelType.FULL) {
+      throw new UnsupportedOperationException(
+          "FULL OUTER JOIN is not supported when join "
+              + "a bounded table with an unbounded table.");
+    }
+
+    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+
+    if ((joinType == JoinRelType.LEFT && leftRelNode.isBounded() == 
PCollection.IsBounded.BOUNDED)
+        || (joinType == JoinRelType.RIGHT
+            && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED)) {
+      throw new UnsupportedOperationException(
+          String.format("%s side of an OUTER JOIN must be Unbounded table.", 
joinType.name()));
+    }
+    return new SideInputJoin();
+  }
+
+  private class SideInputJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> pinput) {
+      Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
+      Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
+
+      PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new 
ExtractJoinKeys());
+
+      PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
+      PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
+
+      return sideInputJoin(extractedLeftRows, extractedRightRows, leftSchema, 
rightSchema);
+    }
+  }
+
+  public PCollection<Row> sideInputJoin(
+      PCollection<KV<Row, Row>> extractedLeftRows,
+      PCollection<KV<Row, Row>> extractedRightRows,
+      Schema leftSchema,
+      Schema rightSchema) {
+    // we always make the Unbounded table on the left to do the sideInput join
+    // (will convert the result accordingly before return)
+    boolean swapped = (extractedLeftRows.isBounded() == 
PCollection.IsBounded.BOUNDED);
+    JoinRelType realJoinType =
+        (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : 
joinType;
+
+    PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows : 
extractedLeftRows;
+    PCollection<KV<Row, Row>> realRightRows = swapped ? extractedLeftRows : 
extractedRightRows;
+
+    Row realRightNullRow;
+    if (swapped) {
+      Schema leftNullSchema = buildNullSchema(leftSchema);
+
+      realRightRows = BeamJoinRel.setValueCoder(realRightRows, 
SchemaCoder.of(leftNullSchema));
+      realRightNullRow = Row.nullRow(leftNullSchema);
+    } else {
+      Schema rightNullSchema = buildNullSchema(rightSchema);
+
+      realRightRows = BeamJoinRel.setValueCoder(realRightRows, 
SchemaCoder.of(rightNullSchema));
+      realRightNullRow = Row.nullRow(rightNullSchema);
+    }
+
+    // swapped still need to pass down because, we need to swap the result 
back.
+    return sideInputJoinHelper(
+        realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
+  }
+
+  private PCollection<Row> sideInputJoinHelper(
+      JoinRelType joinType,
+      PCollection<KV<Row, Row>> leftRows,
+      PCollection<KV<Row, Row>> rightRows,
+      Row rightNullRow,
+      boolean swapped) {
+    final PCollectionView<Map<Row, Iterable<Row>>> rowsView = 
rightRows.apply(View.asMultimap());
+
+    Schema schema = CalciteUtils.toSchema(getRowType());
+    return leftRows
+        .apply(
+            ParDo.of(
+                    new BeamJoinTransforms.SideInputJoinDoFn(
+                        joinType, rightNullRow, rowsView, swapped, schema))
+                .withSideInputs(rowsView))
+        .setRowSchema(schema);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
new file mode 100644
index 0000000..58393c2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+
+public class BeamSideInputLookupJoinRel extends BeamJoinRel {
+
+  public BeamSideInputLookupJoinRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode left,
+      RelNode right,
+      RexNode condition,
+      Set<CorrelationId> variablesSet,
+      JoinRelType joinType) {
+    super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    // Should we throw Exception when joinType is LEFT (or) RIGHT (or) FULL?
+    return new SideInputLookupJoin();
+  }
+
+  private class SideInputLookupJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> pinput) {
+      Schema schema = CalciteUtils.toSchema(getRowType());
+
+      BeamRelNode seekableRel =
+          
BeamSqlRelUtils.getBeamRelInput(getInput(seekableInputIndex().get()));
+      BeamRelNode nonSeekableRel =
+          
BeamSqlRelUtils.getBeamRelInput(getInput(nonSeekableInputIndex().get()));
+
+      // Offset field references according to which table is on the left
+      int factColOffset =
+          nonSeekableInputIndex().get() == 0
+              ? 0
+              : 
CalciteUtils.toSchema(seekableRel.getRowType()).getFieldCount();
+      int lkpColOffset =
+          seekableInputIndex().get() == 0
+              ? 0
+              : 
CalciteUtils.toSchema(nonSeekableRel.getRowType()).getFieldCount();
+
+      // HACK: if the input is an immediate instance of a seekable IO, we can 
do lookups
+      // so we ignore the PCollection
+      BeamIOSourceRel seekableInput = (BeamIOSourceRel) seekableRel;
+      BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable) 
seekableInput.getBeamSqlTable();
+
+      // getPCollectionInputs() ensures that there is only one and it is the 
non-seekable input
+      PCollection<Row> nonSeekableInput = pinput.get(0);
+
+      return nonSeekableInput
+          .apply(
+              "join_as_lookup",
+              new BeamJoinTransforms.JoinAsLookup(
+                  condition,
+                  seekableTable,
+                  CalciteUtils.toSchema(seekableInput.getRowType()),
+                  schema,
+                  factColOffset,
+                  lkpColOffset))
+          .setRowSchema(schema);
+    }
+  }
+
+  @Override
+  public Join copy(
+      RelTraitSet traitSet,
+      RexNode conditionExpr,
+      RelNode left,
+      RelNode right,
+      JoinRelType joinType,
+      boolean semiJoinDone) {
+    return new BeamSideInputLookupJoinRel(
+        getCluster(), traitSet, left, right, conditionExpr, variablesSet, 
joinType);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCoGBKJoinRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCoGBKJoinRule.java
new file mode 100644
index 0000000..88ef48c
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCoGBKJoinRule.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * Rule to convert {@code LogicalJoin} node to {@code BeamCoGBKJoinRel} node.
+ *
+ * <p>This rule is matched when both the inputs to {@code LogicalJoin} node 
have the same
+ * Boundedness i.e. either when both the inputs are {@code 
PCollection.IsBounded.BOUNDED} or when
+ * both the inputs are {@code PCollection.IsBounded.UNBOUNDED}
+ *
+ * <p>As {@code BeamSideInputLookupJoinRel} also matches this condition when 
both the inputs are
+ * {@code PCollection.IsBounded.BOUNDED}, to avoid conflicts, this rule is not 
matched when any of
+ * the inputs to {@code LogicalJoin} node are Seekable.
+ */
+public class BeamCoGBKJoinRule extends RelOptRule {
+  public static final BeamCoGBKJoinRule INSTANCE = new BeamCoGBKJoinRule();
+
+  private BeamCoGBKJoinRule() {
+    super(
+        operand(LogicalJoin.class, operand(RelNode.class, any()), 
operand(RelNode.class, any())),
+        RelFactories.LOGICAL_BUILDER,
+        "BeamCoGBKJoinRule");
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    // The Rule does not match when any of the inputs are Seekable
+    if (BeamJoinRel.containsSeekableInput(call.rel(0))) {
+      return false;
+    }
+    PCollection.IsBounded boundednessOfLeftRel = 
BeamJoinRel.getBoundednessOfRelNode(call.rel(1));
+    PCollection.IsBounded boundednessOfRightRel = 
BeamJoinRel.getBoundednessOfRelNode(call.rel(2));
+    return (boundednessOfLeftRel == boundednessOfRightRel);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Join join = (Join) call.rel(0);
+
+    BeamCoGBKJoinRel rel =
+        new BeamCoGBKJoinRel(
+            join.getCluster(),
+            join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+            convert(
+                join.getLeft(),
+                
join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+            convert(
+                join.getRight(),
+                
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+            join.getCondition(),
+            join.getVariablesSet(),
+            join.getJoinType());
+    call.transformTo(rel);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputJoinRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputJoinRule.java
new file mode 100644
index 0000000..44347c9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputJoinRule.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSideInputJoinRel;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * Rule to convert {@code LogicalJoin} node to {@code BeamSideInputJoinRel} 
node.
+ *
+ * <p>This rule is matched when one of the input to {@code LogicalJoin} node 
is {@code
+ * PCollection.IsBounded.BOUNDED} and the other node is {@code 
PCollection.IsBounded.UNBOUNDED}
+ *
+ * <p>As {@code BeamSideInputLookupJoinRel} also matches this condition, to 
avoid conflicts, this
+ * rule is not matched when any of the inputs to {@code LogicalJoin} node are 
Seekable.
+ */
+public class BeamSideInputJoinRule extends RelOptRule {
+  public static final BeamSideInputJoinRule INSTANCE = new 
BeamSideInputJoinRule();
+
+  private BeamSideInputJoinRule() {
+    super(
+        operand(LogicalJoin.class, operand(RelNode.class, any()), 
operand(RelNode.class, any())),
+        RelFactories.LOGICAL_BUILDER,
+        "BeamSideInputJoinRule");
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    // The Rule does not match when any of the inputs are Seekable
+    if (BeamJoinRel.containsSeekableInput(call.rel(0))) {
+      return false;
+    }
+    PCollection.IsBounded boundednessOfLeftRel = 
BeamJoinRel.getBoundednessOfRelNode(call.rel(1));
+    PCollection.IsBounded boundednessOfRightRel = 
BeamJoinRel.getBoundednessOfRelNode(call.rel(2));
+    return (boundednessOfLeftRel == PCollection.IsBounded.BOUNDED
+        ? boundednessOfRightRel == PCollection.IsBounded.UNBOUNDED
+        : boundednessOfRightRel == PCollection.IsBounded.BOUNDED);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Join join = (Join) call.rel(0);
+
+    BeamSideInputJoinRel rel =
+        new BeamSideInputJoinRel(
+            join.getCluster(),
+            join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+            convert(
+                join.getLeft(),
+                
join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+            convert(
+                join.getRight(),
+                
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+            join.getCondition(),
+            join.getVariablesSet(),
+            join.getJoinType());
+    call.transformTo(rel);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java
similarity index 63%
rename from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
rename to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java
index c94dd67..2e233d5 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java
@@ -19,25 +19,43 @@ package org.apache.beam.sdk.extensions.sql.impl.rule;
 
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSideInputLookupJoinRel;
 import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.logical.LogicalJoin;
 
-/** {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. */
-public class BeamJoinRule extends ConverterRule {
-  public static final BeamJoinRule INSTANCE = new BeamJoinRule();
+/**
+ * Rule to convert {@code LogicalJoin} node to {@code 
BeamSideInputLookupJoinRel} node.
+ *
+ * <p>This rule is matched when any of the inputs to {@code LogicalJoin} node 
are Seekable
+ */
+public class BeamSideInputLookupJoinRule extends ConverterRule {
+  public static final BeamSideInputLookupJoinRule INSTANCE = new 
BeamSideInputLookupJoinRule();
 
-  private BeamJoinRule() {
-    super(LogicalJoin.class, Convention.NONE, BeamLogicalConvention.INSTANCE, 
"BeamJoinRule");
+  public BeamSideInputLookupJoinRule() {
+    super(
+        LogicalJoin.class,
+        Convention.NONE,
+        BeamLogicalConvention.INSTANCE,
+        "BeamSideInputLookupJoinRule");
+  }
+
+  // The Rule is Matched when any of the inputs are Seekable
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    RelNode joinRel = call.rel(0);
+    boolean matches = BeamJoinRel.containsSeekableInput(joinRel);
+    return (matches);
   }
 
   @Override
   public RelNode convert(RelNode rel) {
     Join join = (Join) rel;
 
-    return new BeamJoinRel(
+    return (new BeamSideInputLookupJoinRel(
         join.getCluster(),
         join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
         convert(
@@ -46,6 +64,6 @@ public class BeamJoinRule extends ConverterRule {
             join.getRight(), 
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
         join.getCondition(),
         join.getVariablesSet(),
-        join.getJoinType());
+        join.getJoinType()));
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index d643391..b210bb0 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -18,8 +18,8 @@
 package org.apache.beam.sdk.extensions.sql;
 
 import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
-import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
-import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
+import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
 import static 
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone;
 import static org.hamcrest.Matchers.stringContainsInOrder;
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java
index 2a476ce..3f5a8f0 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java
@@ -68,7 +68,7 @@ public class BeamSqlExplainTest {
 
     assertEquals(
         "BeamCalcRel(expr#0..3=[{inputs}], c1=[$t0], c2=[$t3])\n"
-            + "  BeamJoinRel(condition=[=($0, $3)], joinType=[inner])\n"
+            + "  BeamCoGBKJoinRel(condition=[=($0, $3)], joinType=[inner])\n"
             + "    BeamCalcRel(expr#0..1=[{inputs}], expr#2=[0], 
expr#3=[>($t0, $t2)],"
             + " proj#0..1=[{exprs}], $condition=[$t3])\n"
             + "      BeamIOSourceRel(table=[[beam, A]])\n"
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
similarity index 85%
rename from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
rename to 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
index f208ecc..ca65e31 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.rel.RelNode;
@@ -33,8 +36,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-/** Bounded + Bounded Test for {@code BeamJoinRel}. */
-public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest {
+/** Bounded + Bounded Test for {@code BeamCoGBKJoinRel}. */
+public class BeamCoGBKJoinRelBoundedVsBoundedTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
   @Rule public ExpectedException thrown = ExpectedException.none();
 
@@ -52,10 +55,17 @@ public class BeamJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
               Schema.FieldType.INT32, "price")
           .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5);
 
+  public static final BeamSqlTable SITE_LKP =
+      new BeamSideInputJoinRelUnboundedVsBoundedTest.SiteLookupTable(
+          TestTableUtils.buildBeamSqlSchema(
+              Schema.FieldType.INT32, "order_id",
+              Schema.FieldType.STRING, "site_name"));
+
   @BeforeClass
   public static void prepare() {
     registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
     registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
+    registerTable("SITE_LKP", SITE_LKP);
   }
 
   @Test
@@ -95,17 +105,17 @@ public class BeamJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
 
     RelNode root = env.parseQuery(sql);
 
-    while (!(root instanceof BeamJoinRel)) {
+    while (!(root instanceof BeamCoGBKJoinRel)) {
       root = root.getInput(0);
     }
 
     NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
     NodeStats leftEstimate =
         BeamSqlRelUtils.getNodeStats(
-            ((BeamJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
+            ((BeamCoGBKJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
     NodeStats rightEstimate =
         BeamSqlRelUtils.getNodeStats(
-            ((BeamJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
+            ((BeamCoGBKJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
 
     Assert.assertFalse(estimate.isUnknown());
     Assert.assertEquals(0d, estimate.getRate(), 0.01);
@@ -136,13 +146,13 @@ public class BeamJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
 
     RelNode root1 = env.parseQuery(sql1);
 
-    while (!(root1 instanceof BeamJoinRel)) {
+    while (!(root1 instanceof BeamCoGBKJoinRel)) {
       root1 = root1.getInput(0);
     }
 
     RelNode root2 = env.parseQuery(sql2);
 
-    while (!(root2 instanceof BeamJoinRel)) {
+    while (!(root2 instanceof BeamCoGBKJoinRel)) {
       root2 = root2.getInput(0);
     }
 
@@ -375,4 +385,44 @@ public class BeamJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
     compilePipeline(sql, pipeline);
     pipeline.run();
   }
+
+  @Test
+  public void testBoundedVsLookupTableJoin() throws Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + " ORDER_DETAILS1 o1 "
+            + " JOIN SITE_LKP o2 "
+            + " on "
+            + " o1.order_id=o2.order_id "
+            + " WHERE o1.order_id=1";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.STRING, "site_name")
+                .addRows(1, "SITE1")
+                .getStringRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testLookupTableVsBoundedJoin() throws Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + " SITE_LKP o2 "
+            + " JOIN ORDER_DETAILS1 o1 "
+            + " on "
+            + " o1.order_id=o2.order_id "
+            + " WHERE o1.order_id=1";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.STRING, "site_name")
+                .addRows(1, "SITE1")
+                .getStringRows());
+    pipeline.run();
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
similarity index 96%
rename from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
rename to 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
index 1f19cf8..f9a5fb4 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
@@ -36,8 +36,8 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
-/** Unbounded + Unbounded Test for {@code BeamJoinRel}. */
-public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
+/** Unbounded + Unbounded Test for {@code BeamStandardJoinRel}. */
+public class BeamCoGBKJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
   private static final DateTime FIRST_DATE = new DateTime(1);
   private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
@@ -121,17 +121,17 @@ public class BeamJoinRelUnboundedVsUnboundedTest extends 
BaseRelTest {
 
     RelNode root = env.parseQuery(sql);
 
-    while (!(root instanceof BeamJoinRel)) {
+    while (!(root instanceof BeamCoGBKJoinRel)) {
       root = root.getInput(0);
     }
 
     NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
     NodeStats leftEstimate =
         BeamSqlRelUtils.getNodeStats(
-            ((BeamJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
+            ((BeamCoGBKJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
     NodeStats rightEstimate =
         BeamSqlRelUtils.getNodeStats(
-            ((BeamJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
+            ((BeamCoGBKJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
 
     Assert.assertFalse(estimate.isUnknown());
     Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
similarity index 96%
rename from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
rename to 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
index d19f5e0..45881a8 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
@@ -45,8 +45,8 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
-/** Unbounded + Unbounded Test for {@code BeamJoinRel}. */
-public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest {
+/** Unbounded + Unbounded Test for {@code BeamSideInputJoinRel}. */
+public class BeamSideInputJoinRelUnboundedVsBoundedTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
   public static final DateTime FIRST_DATE = new DateTime(1);
   public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
@@ -200,17 +200,17 @@ public class BeamJoinRelUnboundedVsBoundedTest extends 
BaseRelTest {
 
     RelNode root = env.parseQuery(sql);
 
-    while (!(root instanceof BeamJoinRel)) {
+    while (!(root instanceof BeamSideInputJoinRel)) {
       root = root.getInput(0);
     }
 
     NodeStats estimate = BeamSqlRelUtils.getNodeStats(root, 
root.getCluster().getMetadataQuery());
     NodeStats leftEstimate =
         BeamSqlRelUtils.getNodeStats(
-            ((BeamJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
+            ((BeamSideInputJoinRel) root).getLeft(), 
root.getCluster().getMetadataQuery());
     NodeStats rightEstimate =
         BeamSqlRelUtils.getNodeStats(
-            ((BeamJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
+            ((BeamSideInputJoinRel) root).getRight(), 
root.getCluster().getMetadataQuery());
 
     Assert.assertFalse(estimate.isUnknown());
     Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
@@ -324,7 +324,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends 
BaseRelTest {
   }
 
   @Test
-  public void testJoinAsLookup() throws Exception {
+  public void testUnboundedVsLookupTableJoin() throws Exception {
     String sql =
         "SELECT o1.order_id, o2.site_name FROM "
             + " ORDER_DETAILS o1 "
@@ -344,7 +344,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends 
BaseRelTest {
   }
 
   @Test
-  public void testJoinAsLookupSwapped() throws Exception {
+  public void testLookupTableVsUnboundedJoin() throws Exception {
     String sql =
         "SELECT o1.order_id, o2.site_name FROM "
             + " SITE_LKP o2 "

Reply via email to