[ 
https://issues.apache.org/jira/browse/BEAM-6098?focusedWorklogId=169957&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169957
 ]

ASF GitHub Bot logged work on BEAM-6098:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Nov/18 18:51
            Start Date: 27/Nov/18 18:51
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #7118: [BEAM-6098] 
Support lookup join symmetric in left/right inputs
URL: https://github.com/apache/beam/pull/7118
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 1b02506979a5..596c7025bb2f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED;
 import static org.joda.time.Duration.ZERO;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
@@ -120,21 +120,84 @@ public Join copy(
 
   @Override
   public List<RelNode> getPCollectionInputs() {
-    if (isSideInputJoin()) {
-      return ImmutableList.of(BeamSqlRelUtils.getBeamRelInput(left));
+    if (isSideInputLookupJoin()) {
+      return ImmutableList.of(
+          
BeamSqlRelUtils.getBeamRelInput(getInputs().get(nonSeekableInputIndex().get())));
+    } else {
+      return BeamRelNode.super.getPCollectionInputs();
     }
-    return BeamRelNode.super.getPCollectionInputs();
   }
 
   @Override
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
-    return new Transform();
+    if (isSideInputLookupJoin()) {
+      return new SideInputLookupJoin();
+    } else {
+      return new Transform();
+    }
+  }
+
+  private boolean isSideInputLookupJoin() {
+    return seekableInputIndex().isPresent() && 
nonSeekableInputIndex().isPresent();
   }
 
-  private boolean isSideInputJoin() {
+  private Optional<Integer> seekableInputIndex() {
     BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
     BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-    return !seekable(leftRelNode) && seekable(rightRelNode);
+    return seekable(leftRelNode)
+        ? Optional.of(0)
+        : seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
+  }
+
+  private Optional<Integer> nonSeekableInputIndex() {
+    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+    return !seekable(leftRelNode)
+        ? Optional.of(0)
+        : !seekable(rightRelNode) ? Optional.of(1) : Optional.absent();
+  }
+
+  private class SideInputLookupJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> pinput) {
+      Schema schema = CalciteUtils.toSchema(getRowType());
+
+      BeamRelNode seekableRel =
+          
BeamSqlRelUtils.getBeamRelInput(getInput(seekableInputIndex().get()));
+      BeamRelNode nonSeekableRel =
+          
BeamSqlRelUtils.getBeamRelInput(getInput(nonSeekableInputIndex().get()));
+
+      // Offset field references according to which table is on the left
+      int factColOffset =
+          nonSeekableInputIndex().get() == 0
+              ? 0
+              : 
CalciteUtils.toSchema(seekableRel.getRowType()).getFieldCount();
+      int lkpColOffset =
+          seekableInputIndex().get() == 0
+              ? 0
+              : 
CalciteUtils.toSchema(nonSeekableRel.getRowType()).getFieldCount();
+
+      // HACK: if the input is an immediate instance of a seekable IO, we can 
do lookups
+      // so we ignore the PCollection
+      BeamIOSourceRel seekableInput = (BeamIOSourceRel) seekableRel;
+      BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable) 
seekableInput.getBeamSqlTable();
+
+      // getPCollectionInputs() ensures that there is only one and it is the 
non-seekable input
+      PCollection<Row> nonSeekableInput = pinput.get(0);
+
+      return nonSeekableInput
+          .apply(
+              "join_as_lookup",
+              new BeamJoinTransforms.JoinAsLookup(
+                  condition,
+                  seekableTable,
+                  CalciteUtils.toSchema(seekableInput.getRowType()),
+                  schema,
+                  factColOffset,
+                  lkpColOffset))
+          .setRowSchema(schema);
+    }
   }
 
   private class Transform extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
@@ -142,13 +205,6 @@ private boolean isSideInputJoin() {
     @Override
     public PCollection<Row> expand(PCollectionList<Row> pinput) {
       BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-      final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-
-      if (isSideInputJoin()) {
-        checkArgument(pinput.size() == 1, "More than one input received for 
side input join");
-        Schema schema = CalciteUtils.toSchema(getRowType());
-        return joinAsLookup(leftRelNode, rightRelNode, pinput.get(0), 
schema).setRowSchema(schema);
-      }
 
       Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
       Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
@@ -434,24 +490,6 @@ private Schema buildNullSchema(Schema schema) {
     return new Pair<>(leftIndex, rightIndex);
   }
 
-  private PCollection<Row> joinAsLookup(
-      BeamRelNode leftRelNode,
-      BeamRelNode rightRelNode,
-      PCollection<Row> factStream,
-      Schema outputSchema) {
-    BeamIOSourceRel srcRel = (BeamIOSourceRel) rightRelNode;
-    BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable) 
srcRel.getBeamSqlTable();
-
-    return factStream.apply(
-        "join_as_lookup",
-        new BeamJoinTransforms.JoinAsLookup(
-            condition,
-            seekableTable,
-            CalciteUtils.toSchema(rightRelNode.getRowType()),
-            outputSchema,
-            CalciteUtils.toSchema(leftRelNode.getRowType()).getFieldCount()));
-  }
-
   /** check if {@code BeamRelNode} implements {@code BeamSeekableTable}. */
   private boolean seekable(BeamRelNode relNode) {
     if (relNode instanceof BeamIOSourceRel) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 97d1a3d77a0f..da5bd6effba7 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -145,6 +145,7 @@ private static Row combineTwoRowsIntoOneHelper(Row leftRow, 
Row rightRow, Schema
   public static class JoinAsLookup extends PTransform<PCollection<Row>, 
PCollection<Row>> {
     private final BeamSqlSeekableTable seekableTable;
     private final Schema lkpSchema;
+    private final int factColOffset;
     private Schema joinSubsetType;
     private final Schema outputSchema;
     private List<Integer> factJoinIdx;
@@ -154,14 +155,16 @@ public JoinAsLookup(
         BeamSqlSeekableTable seekableTable,
         Schema lkpSchema,
         Schema outputSchema,
-        int factTableColSize) {
+        int factColOffset,
+        int lkpColOffset) {
       this.seekableTable = seekableTable;
       this.lkpSchema = lkpSchema;
       this.outputSchema = outputSchema;
-      joinFieldsMapping(joinCondition, factTableColSize);
+      this.factColOffset = factColOffset;
+      joinFieldsMapping(joinCondition, factColOffset, lkpColOffset);
     }
 
-    private void joinFieldsMapping(RexNode joinCondition, int 
factTableColSize) {
+    private void joinFieldsMapping(RexNode joinCondition, int factColOffset, 
int lkpColOffset) {
       factJoinIdx = new ArrayList<>();
       List<Schema.Field> lkpJoinFields = new ArrayList<>();
 
@@ -169,15 +172,15 @@ private void joinFieldsMapping(RexNode joinCondition, int 
factTableColSize) {
       if ("AND".equals(call.getOperator().getName())) {
         List<RexNode> operands = call.getOperands();
         for (RexNode rexNode : operands) {
-          factJoinIdx.add(((RexInputRef) ((RexCall) 
rexNode).getOperands().get(0)).getIndex());
+          factJoinIdx.add(
+              ((RexInputRef) ((RexCall) 
rexNode).getOperands().get(0)).getIndex() - factColOffset);
           int lkpJoinIdx =
-              ((RexInputRef) ((RexCall) 
rexNode).getOperands().get(1)).getIndex()
-                  - factTableColSize;
+              ((RexInputRef) ((RexCall) 
rexNode).getOperands().get(1)).getIndex() - lkpColOffset;
           lkpJoinFields.add(lkpSchema.getField(lkpJoinIdx));
         }
       } else if ("=".equals(call.getOperator().getName())) {
-        factJoinIdx.add(((RexInputRef) call.getOperands().get(0)).getIndex());
-        int lkpJoinIdx = ((RexInputRef) call.getOperands().get(1)).getIndex() 
- factTableColSize;
+        factJoinIdx.add(((RexInputRef) call.getOperands().get(0)).getIndex() - 
factColOffset);
+        int lkpJoinIdx = ((RexInputRef) call.getOperands().get(1)).getIndex() 
- lkpColOffset;
         lkpJoinFields.add(lkpSchema.getField(lkpJoinIdx));
       } else {
         throw new UnsupportedOperationException(
@@ -205,7 +208,8 @@ public void processElement(ProcessContext context) {
                       Row joinSubRow = extractJoinSubRow(factRow);
                       List<Row> lookupRows = seekableTable.seekRow(joinSubRow);
                       for (Row lr : lookupRows) {
-                        context.output(combineTwoRowsIntoOneHelper(factRow, 
lr, outputSchema));
+                        context.output(
+                            combineTwoRowsIntoOne(factRow, lr, factColOffset 
!= 0, outputSchema));
                       }
                     }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 2a4eb319c78f..da447d510914 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -289,4 +289,24 @@ public void testJoinAsLookup() throws Exception {
                 .getStringRows());
     pipeline.run();
   }
+
+  @Test
+  public void testJoinAsLookupSwapped() throws Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + " SITE_LKP o2 "
+            + " JOIN ORDER_DETAILS o1 "
+            + " on "
+            + " o1.site_id=o2.site_id "
+            + " WHERE o1.site_id=1";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.STRING, "site_name")
+                .addRows(1, "SITE1")
+                .getStringRows());
+    pipeline.run();
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 169957)
    Time Spent: 1h 40m  (was: 1.5h)

> Support side input join on right or left
> ----------------------------------------
>
>                 Key: BEAM-6098
>                 URL: https://issues.apache.org/jira/browse/BEAM-6098
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Currently, side input join is hardcoded as to which of left vs right must be 
> seekable/non-seekable. Making this symmetric would be an incremental 
> improvement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to