This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cd3b2cb [BEAM-6114] Calcite Rules to Select Type of Join in BeamSQL
new 1efa67a Merge pull request #9395 from rahul8383/beamsql-join-rules
cd3b2cb is described below
commit cd3b2cb97e096ef38c3bb9ccd51165ac7b2ad841
Author: parahul <[email protected]>
AuthorDate: Thu Aug 22 00:23:53 2019 +0530
[BEAM-6114] Calcite Rules to Select Type of Join in BeamSQL
---
.../extensions/sql/impl/planner/BeamRuleSets.java | 8 +-
.../extensions/sql/impl/rel/BeamCoGBKJoinRel.java | 193 ++++++++++
.../sdk/extensions/sql/impl/rel/BeamJoinRel.java | 399 ++++-----------------
.../sql/impl/rel/BeamSideInputJoinRel.java | 157 ++++++++
.../sql/impl/rel/BeamSideInputLookupJoinRel.java | 110 ++++++
.../sql/impl/rule/BeamCoGBKJoinRule.java | 82 +++++
.../sql/impl/rule/BeamSideInputJoinRule.java | 82 +++++
...nRule.java => BeamSideInputLookupJoinRule.java} | 32 +-
.../sdk/extensions/sql/BeamSqlDslJoinTest.java | 4 +-
.../sdk/extensions/sql/BeamSqlExplainTest.java | 2 +-
...a => BeamCoGBKJoinRelBoundedVsBoundedTest.java} | 64 +++-
... BeamCoGBKJoinRelUnboundedVsUnboundedTest.java} | 10 +-
...eamSideInputJoinRelUnboundedVsBoundedTest.java} | 14 +-
13 files changed, 805 insertions(+), 352 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 1a78e98..b2766d6 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -23,12 +23,14 @@ import
org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamBasicAggregationRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCoGBKJoinRule;
import
org.apache.beam.sdk.extensions.sql.impl.rule.BeamEnumerableConverterRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIntersectRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinAssociateRule;
import
org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinPushThroughJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamMinusRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputJoinRule;
+import
org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputLookupJoinRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSortRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUncollectRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnionRule;
@@ -151,7 +153,9 @@ public class BeamRuleSets {
BeamUnionRule.INSTANCE,
BeamUncollectRule.INSTANCE,
BeamUnnestRule.INSTANCE,
- BeamJoinRule.INSTANCE);
+ BeamSideInputJoinRule.INSTANCE,
+ BeamCoGBKJoinRule.INSTANCE,
+ BeamSideInputLookupJoinRule.INSTANCE);
private static final List<RelOptRule> BEAM_TO_ENUMERABLE =
ImmutableList.of(BeamEnumerableConverterRule.INSTANCE);
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
new file mode 100644
index 0000000..0e165b6
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
+import static org.joda.time.Duration.ZERO;
+
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+
+public class BeamCoGBKJoinRel extends BeamJoinRel {
+
+ public BeamCoGBKJoinRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode left,
+ RelNode right,
+ RexNode condition,
+ Set<CorrelationId> variablesSet,
+ JoinRelType joinType) {
+ super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+ }
+
+ @Override
+ public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+ return new StandardJoin();
+ }
+
+ private class StandardJoin extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollectionList<Row> pinput) {
+ Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
+ Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
+
+ PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new
ExtractJoinKeys());
+
+ PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
+ PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
+
+ WindowFn leftWinFn =
extractedLeftRows.getWindowingStrategy().getWindowFn();
+ WindowFn rightWinFn =
extractedRightRows.getWindowingStrategy().getWindowFn();
+
+ try {
+ leftWinFn.verifyCompatibility(rightWinFn);
+ } catch (IncompatibleWindowException e) {
+ throw new IllegalArgumentException(
+ "WindowFns must match for a
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
+ }
+
+ verifySupportedTrigger(extractedLeftRows);
+ verifySupportedTrigger(extractedRightRows);
+
+ return standardJoin(extractedLeftRows, extractedRightRows, leftSchema,
rightSchema);
+ }
+ }
+
+ private <T> void verifySupportedTrigger(PCollection<T> pCollection) {
+ WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
+
+ if (UNBOUNDED.equals(pCollection.isBounded()) &&
!triggersOncePerWindow(windowingStrategy)) {
+ throw new UnsupportedOperationException(
+ "Joining unbounded PCollections is currently only supported for "
+ + "non-global windows with triggers that are known to produce
output once per window,"
+ + "such as the default trigger with zero allowed lateness. "
+ + "In these cases Beam can guarantee it joins all input elements
once per window. "
+ + windowingStrategy
+ + " is not supported");
+ }
+ }
+
+ private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) {
+ Trigger trigger = windowingStrategy.getTrigger();
+
+ return !(windowingStrategy.getWindowFn() instanceof GlobalWindows)
+ && trigger instanceof DefaultTrigger
+ && ZERO.equals(windowingStrategy.getAllowedLateness());
+ }
+
+ private PCollection<Row> standardJoin(
+ PCollection<KV<Row, Row>> extractedLeftRows,
+ PCollection<KV<Row, Row>> extractedRightRows,
+ Schema leftSchema,
+ Schema rightSchema) {
+ PCollection<KV<Row, KV<Row, Row>>> joinedRows = null;
+
+ switch (joinType) {
+ case LEFT:
+ {
+ Schema rigthNullSchema = buildNullSchema(rightSchema);
+ Row rightNullRow = Row.nullRow(rigthNullSchema);
+
+ extractedRightRows = setValueCoder(extractedRightRows,
SchemaCoder.of(rigthNullSchema));
+
+ joinedRows =
+ org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(
+ extractedLeftRows, extractedRightRows, rightNullRow);
+
+ break;
+ }
+ case RIGHT:
+ {
+ Schema leftNullSchema = buildNullSchema(leftSchema);
+ Row leftNullRow = Row.nullRow(leftNullSchema);
+
+ extractedLeftRows = setValueCoder(extractedLeftRows,
SchemaCoder.of(leftNullSchema));
+
+ joinedRows =
+ org.apache.beam.sdk.extensions.joinlibrary.Join.rightOuterJoin(
+ extractedLeftRows, extractedRightRows, leftNullRow);
+ break;
+ }
+ case FULL:
+ {
+ Schema leftNullSchema = buildNullSchema(leftSchema);
+ Schema rightNullSchema = buildNullSchema(rightSchema);
+
+ Row leftNullRow = Row.nullRow(leftNullSchema);
+ Row rightNullRow = Row.nullRow(rightNullSchema);
+
+ extractedLeftRows = setValueCoder(extractedLeftRows,
SchemaCoder.of(leftNullSchema));
+ extractedRightRows = setValueCoder(extractedRightRows,
SchemaCoder.of(rightNullSchema));
+
+ joinedRows =
+ org.apache.beam.sdk.extensions.joinlibrary.Join.fullOuterJoin(
+ extractedLeftRows, extractedRightRows, leftNullRow,
rightNullRow);
+ break;
+ }
+ case INNER:
+ default:
+ joinedRows =
+ org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(
+ extractedLeftRows, extractedRightRows);
+ break;
+ }
+
+ Schema schema = CalciteUtils.toSchema(getRowType());
+ return joinedRows
+ .apply(
+ "JoinParts2WholeRow",
+ MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow(schema)))
+ .setRowSchema(schema);
+ }
+
+ @Override
+ public Join copy(
+ RelTraitSet traitSet,
+ RexNode conditionExpr,
+ RelNode left,
+ RelNode right,
+ JoinRelType joinType,
+ boolean semiJoinDone) {
+ return new BeamCoGBKJoinRel(
+ getCluster(), traitSet, left, right, conditionExpr, variablesSet,
joinType);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 8950974..f2739e4 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -18,13 +18,10 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import static org.apache.beam.sdk.schemas.Schema.toSchema;
-import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
-import static org.joda.time.Duration.ZERO;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
@@ -40,26 +37,18 @@ import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Join;
@@ -102,9 +91,9 @@ import org.apache.calcite.util.Pair;
* <li>CROSS JOIN is not supported.
* </ul>
*/
-public class BeamJoinRel extends Join implements BeamRelNode {
+public abstract class BeamJoinRel extends Join implements BeamRelNode {
- public BeamJoinRel(
+ protected BeamJoinRel(
RelOptCluster cluster,
RelTraitSet traits,
RelNode left,
@@ -116,18 +105,6 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
}
@Override
- public Join copy(
- RelTraitSet traitSet,
- RexNode conditionExpr,
- RelNode left,
- RelNode right,
- JoinRelType joinType,
- boolean semiJoinDone) {
- return new BeamJoinRel(
- getCluster(), traitSet, left, right, conditionExpr, variablesSet,
joinType);
- }
-
- @Override
public List<RelNode> getPCollectionInputs() {
if (isSideInputLookupJoin()) {
return ImmutableList.of(
@@ -137,6 +114,38 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
}
}
+ protected boolean isSideInputLookupJoin() {
+ return seekableInputIndex().isPresent() &&
nonSeekableInputIndex().isPresent();
+ }
+
+ protected Optional<Integer> seekableInputIndex() {
+ BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+ BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+ return seekable(leftRelNode)
+ ? Optional.of(0)
+ : seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
+ }
+
+ protected Optional<Integer> nonSeekableInputIndex() {
+ BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+ BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+ return !seekable(leftRelNode)
+ ? Optional.of(0)
+ : !seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
+ }
+
+ /** check if {@code BeamRelNode} implements {@code BeamSeekableTable}. */
+ public static boolean seekable(BeamRelNode relNode) {
+ if (relNode instanceof BeamIOSourceRel) {
+ BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
+ BeamSqlTable sourceTable = srcRel.getBeamSqlTable();
+ if (sourceTable instanceof BeamSqlSeekableTable) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner,
RelMetadataQuery mq) {
NodeStats leftEstimates = BeamSqlRelUtils.getNodeStats(this.left, mq);
@@ -183,111 +192,7 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
return true;
}
- @Override
- public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
- if (isSideInputLookupJoin()) {
- return new SideInputLookupJoin();
- } else if (isSideInputJoin()) {
- // if one of the sides is Bounded & the other is Unbounded
- // then do a sideInput join
- // when doing a sideInput join, the windowFn does not need to match
- // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join
must be
- // the unbounded
- if (joinType == JoinRelType.FULL) {
- throw new UnsupportedOperationException(
- "FULL OUTER JOIN is not supported when join "
- + "a bounded table with an unbounded table.");
- }
-
- BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-
- if ((joinType == JoinRelType.LEFT && leftRelNode.isBounded() ==
PCollection.IsBounded.BOUNDED)
- || (joinType == JoinRelType.RIGHT
- && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED)) {
- throw new UnsupportedOperationException(
- "LEFT side of an OUTER JOIN must be Unbounded table.");
- }
-
- return new SideInputJoin();
- } else {
- return new StandardJoin();
- }
- }
-
- private boolean isSideInputJoin() {
- BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
- return (leftRelNode.isBounded() == PCollection.IsBounded.BOUNDED
- && rightRelNode.isBounded() == UNBOUNDED)
- || (leftRelNode.isBounded() == UNBOUNDED
- && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED);
- }
-
- private boolean isSideInputLookupJoin() {
- return seekableInputIndex().isPresent() &&
nonSeekableInputIndex().isPresent();
- }
-
- private Optional<Integer> seekableInputIndex() {
- BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
- return seekable(leftRelNode)
- ? Optional.of(0)
- : seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
- }
-
- private Optional<Integer> nonSeekableInputIndex() {
- BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
- return !seekable(leftRelNode)
- ? Optional.of(0)
- : !seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
- }
-
- private class SideInputLookupJoin extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
-
- @Override
- public PCollection<Row> expand(PCollectionList<Row> pinput) {
- Schema schema = CalciteUtils.toSchema(getRowType());
-
- BeamRelNode seekableRel =
-
BeamSqlRelUtils.getBeamRelInput(getInput(seekableInputIndex().get()));
- BeamRelNode nonSeekableRel =
-
BeamSqlRelUtils.getBeamRelInput(getInput(nonSeekableInputIndex().get()));
-
- // Offset field references according to which table is on the left
- int factColOffset =
- nonSeekableInputIndex().get() == 0
- ? 0
- :
CalciteUtils.toSchema(seekableRel.getRowType()).getFieldCount();
- int lkpColOffset =
- seekableInputIndex().get() == 0
- ? 0
- :
CalciteUtils.toSchema(nonSeekableRel.getRowType()).getFieldCount();
-
- // HACK: if the input is an immediate instance of a seekable IO, we can
do lookups
- // so we ignore the PCollection
- BeamIOSourceRel seekableInput = (BeamIOSourceRel) seekableRel;
- BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable)
seekableInput.getBeamSqlTable();
-
- // getPCollectionInputs() ensures that there is only one and it is the
non-seekable input
- PCollection<Row> nonSeekableInput = pinput.get(0);
-
- return nonSeekableInput
- .apply(
- "join_as_lookup",
- new BeamJoinTransforms.JoinAsLookup(
- condition,
- seekableTable,
- CalciteUtils.toSchema(seekableInput.getRowType()),
- schema,
- factColOffset,
- lkpColOffset))
- .setRowSchema(schema);
- }
- }
-
- private class ExtractJoinKeys
+ protected class ExtractJoinKeys
extends PTransform<PCollectionList<Row>, PCollectionList<KV<Row, Row>>> {
@Override
@@ -348,189 +253,7 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
}
}
- private class SideInputJoin extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
-
- @Override
- public PCollection<Row> expand(PCollectionList<Row> pinput) {
- Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
- Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
-
- PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new
ExtractJoinKeys());
-
- PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
- PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
-
- return sideInputJoin(extractedLeftRows, extractedRightRows, leftSchema,
rightSchema);
- }
- }
-
- private class StandardJoin extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
-
- @Override
- public PCollection<Row> expand(PCollectionList<Row> pinput) {
- Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
- Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
-
- PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new
ExtractJoinKeys());
-
- PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
- PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
-
- WindowFn leftWinFn =
extractedLeftRows.getWindowingStrategy().getWindowFn();
- WindowFn rightWinFn =
extractedRightRows.getWindowingStrategy().getWindowFn();
-
- try {
- leftWinFn.verifyCompatibility(rightWinFn);
- } catch (IncompatibleWindowException e) {
- throw new IllegalArgumentException(
- "WindowFns must match for a
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
- }
-
- verifySupportedTrigger(extractedLeftRows);
- verifySupportedTrigger(extractedRightRows);
-
- return standardJoin(extractedLeftRows, extractedRightRows, leftSchema,
rightSchema);
- }
- }
-
- private <T> void verifySupportedTrigger(PCollection<T> pCollection) {
- WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
-
- if (UNBOUNDED.equals(pCollection.isBounded()) &&
!triggersOncePerWindow(windowingStrategy)) {
- throw new UnsupportedOperationException(
- "Joining unbounded PCollections is currently only supported for "
- + "non-global windows with triggers that are known to produce
output once per window,"
- + "such as the default trigger with zero allowed lateness. "
- + "In these cases Beam can guarantee it joins all input elements
once per window. "
- + windowingStrategy
- + " is not supported");
- }
- }
-
- private boolean triggersOncePerWindow(WindowingStrategy windowingStrategy) {
- Trigger trigger = windowingStrategy.getTrigger();
-
- return !(windowingStrategy.getWindowFn() instanceof GlobalWindows)
- && trigger instanceof DefaultTrigger
- && ZERO.equals(windowingStrategy.getAllowedLateness());
- }
-
- private PCollection<Row> standardJoin(
- PCollection<KV<Row, Row>> extractedLeftRows,
- PCollection<KV<Row, Row>> extractedRightRows,
- Schema leftSchema,
- Schema rightSchema) {
- PCollection<KV<Row, KV<Row, Row>>> joinedRows = null;
-
- switch (joinType) {
- case LEFT:
- {
- Schema rigthNullSchema = buildNullSchema(rightSchema);
- Row rightNullRow = Row.nullRow(rigthNullSchema);
-
- extractedRightRows = setValueCoder(extractedRightRows,
SchemaCoder.of(rigthNullSchema));
-
- joinedRows =
- org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(
- extractedLeftRows, extractedRightRows, rightNullRow);
-
- break;
- }
- case RIGHT:
- {
- Schema leftNullSchema = buildNullSchema(leftSchema);
- Row leftNullRow = Row.nullRow(leftNullSchema);
-
- extractedLeftRows = setValueCoder(extractedLeftRows,
SchemaCoder.of(leftNullSchema));
-
- joinedRows =
- org.apache.beam.sdk.extensions.joinlibrary.Join.rightOuterJoin(
- extractedLeftRows, extractedRightRows, leftNullRow);
- break;
- }
- case FULL:
- {
- Schema leftNullSchema = buildNullSchema(leftSchema);
- Schema rightNullSchema = buildNullSchema(rightSchema);
-
- Row leftNullRow = Row.nullRow(leftNullSchema);
- Row rightNullRow = Row.nullRow(rightNullSchema);
-
- extractedLeftRows = setValueCoder(extractedLeftRows,
SchemaCoder.of(leftNullSchema));
- extractedRightRows = setValueCoder(extractedRightRows,
SchemaCoder.of(rightNullSchema));
-
- joinedRows =
- org.apache.beam.sdk.extensions.joinlibrary.Join.fullOuterJoin(
- extractedLeftRows, extractedRightRows, leftNullRow,
rightNullRow);
- break;
- }
- case INNER:
- default:
- joinedRows =
- org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(
- extractedLeftRows, extractedRightRows);
- break;
- }
-
- Schema schema = CalciteUtils.toSchema(getRowType());
- return joinedRows
- .apply(
- "JoinParts2WholeRow",
- MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow(schema)))
- .setRowSchema(schema);
- }
-
- public PCollection<Row> sideInputJoin(
- PCollection<KV<Row, Row>> extractedLeftRows,
- PCollection<KV<Row, Row>> extractedRightRows,
- Schema leftSchema,
- Schema rightSchema) {
- // we always make the Unbounded table on the left to do the sideInput join
- // (will convert the result accordingly before return)
- boolean swapped = (extractedLeftRows.isBounded() ==
PCollection.IsBounded.BOUNDED);
- JoinRelType realJoinType =
- (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT :
joinType;
-
- PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows :
extractedLeftRows;
- PCollection<KV<Row, Row>> realRightRows = swapped ? extractedLeftRows :
extractedRightRows;
-
- Row realRightNullRow;
- if (swapped) {
- Schema leftNullSchema = buildNullSchema(leftSchema);
-
- realRightRows = setValueCoder(realRightRows,
SchemaCoder.of(leftNullSchema));
- realRightNullRow = Row.nullRow(leftNullSchema);
- } else {
- Schema rightNullSchema = buildNullSchema(rightSchema);
-
- realRightRows = setValueCoder(realRightRows,
SchemaCoder.of(rightNullSchema));
- realRightNullRow = Row.nullRow(rightNullSchema);
- }
-
- // swapped still need to pass down because, we need to swap the result
back.
- return sideInputJoinHelper(
- realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
- }
-
- private PCollection<Row> sideInputJoinHelper(
- JoinRelType joinType,
- PCollection<KV<Row, Row>> leftRows,
- PCollection<KV<Row, Row>> rightRows,
- Row rightNullRow,
- boolean swapped) {
- final PCollectionView<Map<Row, Iterable<Row>>> rowsView =
rightRows.apply(View.asMultimap());
-
- Schema schema = CalciteUtils.toSchema(getRowType());
- return leftRows
- .apply(
- ParDo.of(
- new BeamJoinTransforms.SideInputJoinDoFn(
- joinType, rightNullRow, rowsView, swapped, schema))
- .withSideInputs(rowsView))
- .setRowSchema(schema);
- }
-
- private Schema buildNullSchema(Schema schema) {
+ protected Schema buildNullSchema(Schema schema) {
Schema.Builder builder = Schema.builder();
builder.addFields(
@@ -539,7 +262,7 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
return builder.build();
}
- private static <K, V> PCollection<KV<K, V>> setValueCoder(
+ protected static <K, V> PCollection<KV<K, V>> setValueCoder(
PCollection<KV<K, V>> kvs, Coder<V> valueCoder) {
// safe case because PCollection of KV always has KvCoder
KvCoder<K, V> coder = (KvCoder<K, V>) kvs.getCoder();
@@ -651,15 +374,49 @@ public class BeamJoinRel extends Join implements
BeamRelNode {
throw new UnsupportedOperationException("Cannot get column index from " +
rexNode.getType());
}
- /** check if {@code BeamRelNode} implements {@code BeamSeekableTable}. */
- private boolean seekable(BeamRelNode relNode) {
- if (relNode instanceof BeamIOSourceRel) {
- BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
- BeamSqlTable sourceTable = srcRel.getBeamSqlTable();
- if (sourceTable instanceof BeamSqlSeekableTable) {
+ // The Volcano planner works in a top-down fashion. It starts by transforming
+ // the root and move towards the leafs of the plan. Due to this when
+ // transforming a logical join its inputs are still in the logical
convention.
+ // So, Recursively visit the inputs of the RelNode till BeamIOSourceRel is
encountered and
+ // propagate the boundedness upwards.
+ public static PCollection.IsBounded getBoundednessOfRelNode(RelNode relNode)
{
+ if (relNode instanceof BeamRelNode) {
+ return (((BeamRelNode) relNode).isBounded());
+ }
+ List<PCollection.IsBounded> boundednessOfInputs = new ArrayList<>();
+ for (RelNode inputRel : relNode.getInputs()) {
+ if (inputRel instanceof RelSubset) {
+ // Consider the RelNode with best cost in the RelSubset. If best cost
RelNode cannot be
+ // determined, consider the first RelNode in the RelSubset(Is there a
better way to do
+ // this?)
+ RelNode rel = ((RelSubset) inputRel).getBest();
+ if (rel == null) {
+ rel = ((RelSubset) inputRel).getRelList().get(0);
+ }
+ boundednessOfInputs.add(getBoundednessOfRelNode(rel));
+ } else {
+ boundednessOfInputs.add(getBoundednessOfRelNode(inputRel));
+ }
+ }
+ // If one of the input is Unbounded, the result is Unbounded.
+ return (boundednessOfInputs.contains(PCollection.IsBounded.UNBOUNDED)
+ ? PCollection.IsBounded.UNBOUNDED
+ : PCollection.IsBounded.BOUNDED);
+ }
+
+ public static boolean containsSeekableInput(RelNode relNode) {
+ for (RelNode relInput : relNode.getInputs()) {
+ if (relInput instanceof RelSubset) {
+ relInput = ((RelSubset) relInput).getBest();
+ }
+ // input is Seekable
+ if (relInput != null
+ && relInput instanceof BeamRelNode
+ && (BeamJoinRel.seekable((BeamRelNode) relInput))) {
return true;
}
}
+ // None of the inputs are Seekable
return false;
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
new file mode 100644
index 0000000..3e678e6
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+
+public class BeamSideInputJoinRel extends BeamJoinRel {
+
+ public BeamSideInputJoinRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode left,
+ RelNode right,
+ RexNode condition,
+ Set<CorrelationId> variablesSet,
+ JoinRelType joinType) {
+ super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+ }
+
+ @Override
+ public Join copy(
+ RelTraitSet traitSet,
+ RexNode conditionExpr,
+ RelNode left,
+ RelNode right,
+ JoinRelType joinType,
+ boolean semiJoinDone) {
+ return new BeamSideInputJoinRel(
+ getCluster(), traitSet, left, right, conditionExpr, variablesSet,
joinType);
+ }
+
+ @Override
+ public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+ // if one of the sides is Bounded & the other is Unbounded
+ // then do a sideInput join.
+ // When doing a sideInput join, the windowFn does not need to match.
+ // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join
must be
+ // the unbounded & RIGHT OUTER JOIN where right side of the join must be
the unbounded
+ if (joinType == JoinRelType.FULL) {
+ throw new UnsupportedOperationException(
+ "FULL OUTER JOIN is not supported when join "
+ + "a bounded table with an unbounded table.");
+ }
+
+ BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+ BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+
+ if ((joinType == JoinRelType.LEFT && leftRelNode.isBounded() ==
PCollection.IsBounded.BOUNDED)
+ || (joinType == JoinRelType.RIGHT
+ && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED)) {
+ throw new UnsupportedOperationException(
+ String.format("%s side of an OUTER JOIN must be Unbounded table.",
joinType.name()));
+ }
+ return new SideInputJoin();
+ }
+
+ private class SideInputJoin extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollectionList<Row> pinput) {
+ Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
+ Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
+
+ PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new
ExtractJoinKeys());
+
+ PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
+ PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
+
+ return sideInputJoin(extractedLeftRows, extractedRightRows, leftSchema,
rightSchema);
+ }
+ }
+
+ public PCollection<Row> sideInputJoin(
+ PCollection<KV<Row, Row>> extractedLeftRows,
+ PCollection<KV<Row, Row>> extractedRightRows,
+ Schema leftSchema,
+ Schema rightSchema) {
+ // we always make the Unbounded table on the left to do the sideInput join
+ // (will convert the result accordingly before return)
+ boolean swapped = (extractedLeftRows.isBounded() ==
PCollection.IsBounded.BOUNDED);
+ JoinRelType realJoinType =
+ (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT :
joinType;
+
+ PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows :
extractedLeftRows;
+ PCollection<KV<Row, Row>> realRightRows = swapped ? extractedLeftRows :
extractedRightRows;
+
+ Row realRightNullRow;
+ if (swapped) {
+ Schema leftNullSchema = buildNullSchema(leftSchema);
+
+ realRightRows = BeamJoinRel.setValueCoder(realRightRows,
SchemaCoder.of(leftNullSchema));
+ realRightNullRow = Row.nullRow(leftNullSchema);
+ } else {
+ Schema rightNullSchema = buildNullSchema(rightSchema);
+
+ realRightRows = BeamJoinRel.setValueCoder(realRightRows,
SchemaCoder.of(rightNullSchema));
+ realRightNullRow = Row.nullRow(rightNullSchema);
+ }
+
+ // swapped still need to pass down because, we need to swap the result
back.
+ return sideInputJoinHelper(
+ realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
+ }
+
+ private PCollection<Row> sideInputJoinHelper(
+ JoinRelType joinType,
+ PCollection<KV<Row, Row>> leftRows,
+ PCollection<KV<Row, Row>> rightRows,
+ Row rightNullRow,
+ boolean swapped) {
+ final PCollectionView<Map<Row, Iterable<Row>>> rowsView =
rightRows.apply(View.asMultimap());
+
+ Schema schema = CalciteUtils.toSchema(getRowType());
+ return leftRows
+ .apply(
+ ParDo.of(
+ new BeamJoinTransforms.SideInputJoinDoFn(
+ joinType, rightNullRow, rowsView, swapped, schema))
+ .withSideInputs(rowsView))
+ .setRowSchema(schema);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
new file mode 100644
index 0000000..58393c2
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+
+public class BeamSideInputLookupJoinRel extends BeamJoinRel {
+
+ public BeamSideInputLookupJoinRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode left,
+ RelNode right,
+ RexNode condition,
+ Set<CorrelationId> variablesSet,
+ JoinRelType joinType) {
+ super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+ }
+
+ @Override
+ public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+ // Should we throw Exception when joinType is LEFT (or) RIGHT (or) FULL?
+ return new SideInputLookupJoin();
+ }
+
+ private class SideInputLookupJoin extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollectionList<Row> pinput) {
+ Schema schema = CalciteUtils.toSchema(getRowType());
+
+ BeamRelNode seekableRel =
+
BeamSqlRelUtils.getBeamRelInput(getInput(seekableInputIndex().get()));
+ BeamRelNode nonSeekableRel =
+
BeamSqlRelUtils.getBeamRelInput(getInput(nonSeekableInputIndex().get()));
+
+ // Offset field references according to which table is on the left
+ int factColOffset =
+ nonSeekableInputIndex().get() == 0
+ ? 0
+ :
CalciteUtils.toSchema(seekableRel.getRowType()).getFieldCount();
+ int lkpColOffset =
+ seekableInputIndex().get() == 0
+ ? 0
+ :
CalciteUtils.toSchema(nonSeekableRel.getRowType()).getFieldCount();
+
+ // HACK: if the input is an immediate instance of a seekable IO, we can
do lookups
+ // so we ignore the PCollection
+ BeamIOSourceRel seekableInput = (BeamIOSourceRel) seekableRel;
+ BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable)
seekableInput.getBeamSqlTable();
+
+ // getPCollectionInputs() ensures that there is only one and it is the
non-seekable input
+ PCollection<Row> nonSeekableInput = pinput.get(0);
+
+ return nonSeekableInput
+ .apply(
+ "join_as_lookup",
+ new BeamJoinTransforms.JoinAsLookup(
+ condition,
+ seekableTable,
+ CalciteUtils.toSchema(seekableInput.getRowType()),
+ schema,
+ factColOffset,
+ lkpColOffset))
+ .setRowSchema(schema);
+ }
+ }
+
+ @Override
+ public Join copy(
+ RelTraitSet traitSet,
+ RexNode conditionExpr,
+ RelNode left,
+ RelNode right,
+ JoinRelType joinType,
+ boolean semiJoinDone) {
+ return new BeamSideInputLookupJoinRel(
+ getCluster(), traitSet, left, right, conditionExpr, variablesSet,
joinType);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCoGBKJoinRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCoGBKJoinRule.java
new file mode 100644
index 0000000..88ef48c
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCoGBKJoinRule.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * Rule to convert {@code LogicalJoin} node to {@code BeamCoGBKJoinRel} node.
+ *
+ * <p>This rule is matched when both the inputs to {@code LogicalJoin} node
have the same
+ * Boundedness i.e. either when both the inputs are {@code
PCollection.IsBounded.BOUNDED} or when
+ * both the inputs are {@code PCollection.IsBounded.UNBOUNDED}
+ *
+ * <p>As {@code BeamSideInputLookupJoinRel} also matches this condition when
both the inputs are
+ * {@code PCollection.IsBounded.BOUNDED}, to avoid conflicts, this rule is not
matched when any of
+ * the inputs to {@code LogicalJoin} node are Seekable.
+ */
+public class BeamCoGBKJoinRule extends RelOptRule {
+ public static final BeamCoGBKJoinRule INSTANCE = new BeamCoGBKJoinRule();
+
+ private BeamCoGBKJoinRule() {
+ super(
+ operand(LogicalJoin.class, operand(RelNode.class, any()),
operand(RelNode.class, any())),
+ RelFactories.LOGICAL_BUILDER,
+ "BeamCoGBKJoinRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ // The Rule does not match when any of the inputs are Seekable
+ if (BeamJoinRel.containsSeekableInput(call.rel(0))) {
+ return false;
+ }
+ PCollection.IsBounded boundednessOfLeftRel =
BeamJoinRel.getBoundednessOfRelNode(call.rel(1));
+ PCollection.IsBounded boundednessOfRightRel =
BeamJoinRel.getBoundednessOfRelNode(call.rel(2));
+ return (boundednessOfLeftRel == boundednessOfRightRel);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Join join = (Join) call.rel(0);
+
+ BeamCoGBKJoinRel rel =
+ new BeamCoGBKJoinRel(
+ join.getCluster(),
+ join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(
+ join.getLeft(),
+
join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ convert(
+ join.getRight(),
+
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ join.getCondition(),
+ join.getVariablesSet(),
+ join.getJoinType());
+ call.transformTo(rel);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputJoinRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputJoinRule.java
new file mode 100644
index 0000000..44347c9
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputJoinRule.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSideInputJoinRel;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * Rule to convert {@code LogicalJoin} node to {@code BeamSideInputJoinRel}
node.
+ *
+ * <p>This rule is matched when one of the input to {@code LogicalJoin} node
is {@code
+ * PCollection.IsBounded.BOUNDED} and the other node is {@code
PCollection.IsBounded.UNBOUNDED}
+ *
+ * <p>As {@code BeamSideInputLookupJoinRel} also matches this condition, to
avoid conflicts, this
+ * rule is not matched when any of the inputs to {@code LogicalJoin} node are
Seekable.
+ */
+public class BeamSideInputJoinRule extends RelOptRule {
+ public static final BeamSideInputJoinRule INSTANCE = new
BeamSideInputJoinRule();
+
+ private BeamSideInputJoinRule() {
+ super(
+ operand(LogicalJoin.class, operand(RelNode.class, any()),
operand(RelNode.class, any())),
+ RelFactories.LOGICAL_BUILDER,
+ "BeamSideInputJoinRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ // The Rule does not match when any of the inputs are Seekable
+ if (BeamJoinRel.containsSeekableInput(call.rel(0))) {
+ return false;
+ }
+ PCollection.IsBounded boundednessOfLeftRel =
BeamJoinRel.getBoundednessOfRelNode(call.rel(1));
+ PCollection.IsBounded boundednessOfRightRel =
BeamJoinRel.getBoundednessOfRelNode(call.rel(2));
+ return (boundednessOfLeftRel == PCollection.IsBounded.BOUNDED
+ ? boundednessOfRightRel == PCollection.IsBounded.UNBOUNDED
+ : boundednessOfRightRel == PCollection.IsBounded.BOUNDED);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Join join = (Join) call.rel(0);
+
+ BeamSideInputJoinRel rel =
+ new BeamSideInputJoinRel(
+ join.getCluster(),
+ join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(
+ join.getLeft(),
+
join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ convert(
+ join.getRight(),
+
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ join.getCondition(),
+ join.getVariablesSet(),
+ join.getJoinType());
+ call.transformTo(rel);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java
similarity index 63%
rename from
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
rename to
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java
index c94dd67..2e233d5 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java
@@ -19,25 +19,43 @@ package org.apache.beam.sdk.extensions.sql.impl.rule;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSideInputLookupJoinRel;
import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.logical.LogicalJoin;
-/** {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. */
-public class BeamJoinRule extends ConverterRule {
- public static final BeamJoinRule INSTANCE = new BeamJoinRule();
+/**
+ * Rule to convert {@code LogicalJoin} node to {@code
BeamSideInputLookupJoinRel} node.
+ *
+ * <p>This rule is matched when any of the inputs to {@code LogicalJoin} node
are Seekable
+ */
+public class BeamSideInputLookupJoinRule extends ConverterRule {
+ public static final BeamSideInputLookupJoinRule INSTANCE = new
BeamSideInputLookupJoinRule();
- private BeamJoinRule() {
- super(LogicalJoin.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
"BeamJoinRule");
+ public BeamSideInputLookupJoinRule() {
+ super(
+ LogicalJoin.class,
+ Convention.NONE,
+ BeamLogicalConvention.INSTANCE,
+ "BeamSideInputLookupJoinRule");
+ }
+
+ // The Rule is Matched when any of the inputs are Seekable
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ RelNode joinRel = call.rel(0);
+ boolean matches = BeamJoinRel.containsSeekableInput(joinRel);
+ return (matches);
}
@Override
public RelNode convert(RelNode rel) {
Join join = (Join) rel;
- return new BeamJoinRel(
+ return (new BeamSideInputLookupJoinRel(
join.getCluster(),
join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
convert(
@@ -46,6 +64,6 @@ public class BeamJoinRule extends ConverterRule {
join.getRight(),
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
join.getCondition(),
join.getVariablesSet(),
- join.getJoinType());
+ join.getJoinType()));
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index d643391..b210bb0 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -18,8 +18,8 @@
package org.apache.beam.sdk.extensions.sql;
import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
-import static
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
-import static
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
+import static
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+import static
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
import static
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone;
import static org.hamcrest.Matchers.stringContainsInOrder;
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java
index 2a476ce..3f5a8f0 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java
@@ -68,7 +68,7 @@ public class BeamSqlExplainTest {
assertEquals(
"BeamCalcRel(expr#0..3=[{inputs}], c1=[$t0], c2=[$t3])\n"
- + " BeamJoinRel(condition=[=($0, $3)], joinType=[inner])\n"
+ + " BeamCoGBKJoinRel(condition=[=($0, $3)], joinType=[inner])\n"
+ " BeamCalcRel(expr#0..1=[{inputs}], expr#2=[0],
expr#3=[>($t0, $t2)],"
+ " proj#0..1=[{exprs}], $condition=[$t3])\n"
+ " BeamIOSourceRel(table=[[beam, A]])\n"
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
similarity index 85%
rename from
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
rename to
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
index f208ecc..ca65e31 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
@@ -17,12 +17,15 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.rel.RelNode;
@@ -33,8 +36,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-/** Bounded + Bounded Test for {@code BeamJoinRel}. */
-public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest {
+/** Bounded + Bounded Test for {@code BeamCoGBKJoinRel}. */
+public class BeamCoGBKJoinRelBoundedVsBoundedTest extends BaseRelTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException thrown = ExpectedException.none();
@@ -52,10 +55,17 @@ public class BeamJoinRelBoundedVsBoundedTest extends
BaseRelTest {
Schema.FieldType.INT32, "price")
.addRows(1, 2, 3, 2, 3, 3, 3, 4, 5);
+ public static final BeamSqlTable SITE_LKP =
+ new BeamSideInputJoinRelUnboundedVsBoundedTest.SiteLookupTable(
+ TestTableUtils.buildBeamSqlSchema(
+ Schema.FieldType.INT32, "order_id",
+ Schema.FieldType.STRING, "site_name"));
+
@BeforeClass
public static void prepare() {
registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
+ registerTable("SITE_LKP", SITE_LKP);
}
@Test
@@ -95,17 +105,17 @@ public class BeamJoinRelBoundedVsBoundedTest extends
BaseRelTest {
RelNode root = env.parseQuery(sql);
- while (!(root instanceof BeamJoinRel)) {
+ while (!(root instanceof BeamCoGBKJoinRel)) {
root = root.getInput(0);
}
NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
NodeStats leftEstimate =
BeamSqlRelUtils.getNodeStats(
- ((BeamJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
+ ((BeamCoGBKJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
NodeStats rightEstimate =
BeamSqlRelUtils.getNodeStats(
- ((BeamJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
+ ((BeamCoGBKJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
Assert.assertFalse(estimate.isUnknown());
Assert.assertEquals(0d, estimate.getRate(), 0.01);
@@ -136,13 +146,13 @@ public class BeamJoinRelBoundedVsBoundedTest extends
BaseRelTest {
RelNode root1 = env.parseQuery(sql1);
- while (!(root1 instanceof BeamJoinRel)) {
+ while (!(root1 instanceof BeamCoGBKJoinRel)) {
root1 = root1.getInput(0);
}
RelNode root2 = env.parseQuery(sql2);
- while (!(root2 instanceof BeamJoinRel)) {
+ while (!(root2 instanceof BeamCoGBKJoinRel)) {
root2 = root2.getInput(0);
}
@@ -375,4 +385,44 @@ public class BeamJoinRelBoundedVsBoundedTest extends
BaseRelTest {
compilePipeline(sql, pipeline);
pipeline.run();
}
+
+ @Test
+ public void testBoundedVsLookupTableJoin() throws Exception {
+ String sql =
+ "SELECT o1.order_id, o2.site_name FROM "
+ + " ORDER_DETAILS1 o1 "
+ + " JOIN SITE_LKP o2 "
+ + " on "
+ + " o1.order_id=o2.order_id "
+ + " WHERE o1.order_id=1";
+ PCollection<Row> rows = compilePipeline(sql, pipeline);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Schema.FieldType.INT32, "order_id",
+ Schema.FieldType.STRING, "site_name")
+ .addRows(1, "SITE1")
+ .getStringRows());
+ pipeline.run();
+ }
+
+ @Test
+ public void testLookupTableVsBoundedJoin() throws Exception {
+ String sql =
+ "SELECT o1.order_id, o2.site_name FROM "
+ + " SITE_LKP o2 "
+ + " JOIN ORDER_DETAILS1 o1 "
+ + " on "
+ + " o1.order_id=o2.order_id "
+ + " WHERE o1.order_id=1";
+ PCollection<Row> rows = compilePipeline(sql, pipeline);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Schema.FieldType.INT32, "order_id",
+ Schema.FieldType.STRING, "site_name")
+ .addRows(1, "SITE1")
+ .getStringRows());
+ pipeline.run();
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
similarity index 96%
rename from
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
rename to
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
index 1f19cf8..f9a5fb4 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
@@ -36,8 +36,8 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
-/** Unbounded + Unbounded Test for {@code BeamJoinRel}. */
-public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
+/** Unbounded + Unbounded Test for {@code BeamStandardJoinRel}. */
+public class BeamCoGBKJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
private static final DateTime FIRST_DATE = new DateTime(1);
private static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
@@ -121,17 +121,17 @@ public class BeamJoinRelUnboundedVsUnboundedTest extends
BaseRelTest {
RelNode root = env.parseQuery(sql);
- while (!(root instanceof BeamJoinRel)) {
+ while (!(root instanceof BeamCoGBKJoinRel)) {
root = root.getInput(0);
}
NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
NodeStats leftEstimate =
BeamSqlRelUtils.getNodeStats(
- ((BeamJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
+ ((BeamCoGBKJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
NodeStats rightEstimate =
BeamSqlRelUtils.getNodeStats(
- ((BeamJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
+ ((BeamCoGBKJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
Assert.assertFalse(estimate.isUnknown());
Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
similarity index 96%
rename from
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
rename to
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
index d19f5e0..45881a8 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
@@ -45,8 +45,8 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
-/** Unbounded + Unbounded Test for {@code BeamJoinRel}. */
-public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest {
+/** Unbounded + Unbounded Test for {@code BeamSideInputJoinRel}. */
+public class BeamSideInputJoinRelUnboundedVsBoundedTest extends BaseRelTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
public static final DateTime FIRST_DATE = new DateTime(1);
public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
@@ -200,17 +200,17 @@ public class BeamJoinRelUnboundedVsBoundedTest extends
BaseRelTest {
RelNode root = env.parseQuery(sql);
- while (!(root instanceof BeamJoinRel)) {
+ while (!(root instanceof BeamSideInputJoinRel)) {
root = root.getInput(0);
}
NodeStats estimate = BeamSqlRelUtils.getNodeStats(root,
root.getCluster().getMetadataQuery());
NodeStats leftEstimate =
BeamSqlRelUtils.getNodeStats(
- ((BeamJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
+ ((BeamSideInputJoinRel) root).getLeft(),
root.getCluster().getMetadataQuery());
NodeStats rightEstimate =
BeamSqlRelUtils.getNodeStats(
- ((BeamJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
+ ((BeamSideInputJoinRel) root).getRight(),
root.getCluster().getMetadataQuery());
Assert.assertFalse(estimate.isUnknown());
Assert.assertEquals(0d, estimate.getRowCount(), 0.01);
@@ -324,7 +324,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends
BaseRelTest {
}
@Test
- public void testJoinAsLookup() throws Exception {
+ public void testUnboundedVsLookupTableJoin() throws Exception {
String sql =
"SELECT o1.order_id, o2.site_name FROM "
+ " ORDER_DETAILS o1 "
@@ -344,7 +344,7 @@ public class BeamJoinRelUnboundedVsBoundedTest extends
BaseRelTest {
}
@Test
- public void testJoinAsLookupSwapped() throws Exception {
+ public void testLookupTableVsUnboundedJoin() throws Exception {
String sql =
"SELECT o1.order_id, o2.site_name FROM "
+ " SITE_LKP o2 "