[ 
https://issues.apache.org/jira/browse/BEAM-6114?focusedWorklogId=170050&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170050
 ]

ASF GitHub Bot logged work on BEAM-6114:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Nov/18 23:23
            Start Date: 27/Nov/18 23:23
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #7121: [BEAM-6114] Add 
isBounded() to BeamRelNode and BeamSqlTable, use for JOIN
URL: https://github.com/apache/beam/pull/7121
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 531379d81599..14f1b80174f1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -31,6 +31,9 @@
   /** create a {@code IO.write()} instance to write to target. */
   POutput buildIOWriter(PCollection<Row> input);
 
+  /** Whether this table is bounded (known to be finite) or unbounded (may or 
may not be finite). */
+  PCollection.IsBounded isBounded();
+
   /** Get the schema info of the table. */
   Schema getSchema();
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 12d1ebde3737..69694fe610f1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -45,6 +45,11 @@ public BeamIOSourceRel(
     this.pipelineOptions = pipelineOptions;
   }
 
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return sqlTable.isBounded();
+  }
+
   @Override
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
     return new Transform();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 596c7025bb2f..4d7a242e3343 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -132,11 +132,43 @@ public Join copy(
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
     if (isSideInputLookupJoin()) {
       return new SideInputLookupJoin();
+    } else if (isSideInputJoin()) {
+      // if one of the sides is Bounded & the other is Unbounded
+      // then do a sideInput join
+      // when doing a sideInput join, the windowFn does not need to match
+      // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join 
must be
+      // the unbounded
+      if (joinType == JoinRelType.FULL) {
+        throw new UnsupportedOperationException(
+            "FULL OUTER JOIN is not supported when join "
+                + "a bounded table with an unbounded table.");
+      }
+
+      BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+      BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+
+      if ((joinType == JoinRelType.LEFT && leftRelNode.isBounded() == 
PCollection.IsBounded.BOUNDED)
+          || (joinType == JoinRelType.RIGHT
+              && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED)) {
+        throw new UnsupportedOperationException(
+            "LEFT side of an OUTER JOIN must be Unbounded table.");
+      }
+
+      return new SideInputJoin();
     } else {
-      return new Transform();
+      return new StandardJoin();
     }
   }
 
+  private boolean isSideInputJoin() {
+    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+    BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+    return (leftRelNode.isBounded() == PCollection.IsBounded.BOUNDED
+            && rightRelNode.isBounded() == UNBOUNDED)
+        || (leftRelNode.isBounded() == UNBOUNDED
+            && rightRelNode.isBounded() == PCollection.IsBounded.BOUNDED);
+  }
+
   private boolean isSideInputLookupJoin() {
     return seekableInputIndex().isPresent() && 
nonSeekableInputIndex().isPresent();
   }
@@ -200,10 +232,11 @@ private boolean isSideInputLookupJoin() {
     }
   }
 
-  private class Transform extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
+  private class ExtractJoinKeys
+      extends PTransform<PCollectionList<Row>, PCollectionList<KV<Row, Row>>> {
 
     @Override
-    public PCollection<Row> expand(PCollectionList<Row> pinput) {
+    public PCollectionList<KV<Row, Row>> expand(PCollectionList<Row> pinput) {
       BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
 
       Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
@@ -213,9 +246,6 @@ private boolean isSideInputLookupJoin() {
       PCollection<Row> leftRows = pinput.get(0);
       PCollection<Row> rightRows = pinput.get(1);
 
-      WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
-      WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
-
       // extract the join fields
       List<Pair<Integer, Integer>> pairs =
           extractJoinColumns(leftRelNode.getRowType().getFieldCount());
@@ -247,52 +277,56 @@ private boolean isSideInputLookupJoin() {
                           false, pairs, extractKeySchemaRight)))
               .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
 
-      // a regular join
-      if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
-              && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
-          || (leftRows.isBounded() == UNBOUNDED && rightRows.isBounded() == 
UNBOUNDED)) {
-        verifySupportedTrigger(leftRows);
-        verifySupportedTrigger(rightRows);
-
-        try {
-          leftWinFn.verifyCompatibility(rightWinFn);
-        } catch (IncompatibleWindowException e) {
-          throw new IllegalArgumentException(
-              "WindowFns must match for a 
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
-        }
+      return PCollectionList.of(extractedLeftRows).and(extractedRightRows);
+    }
+  }
 
-        return standardJoin(extractedLeftRows, extractedRightRows, leftSchema, 
rightSchema);
-      } else if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
-              && rightRows.isBounded() == UNBOUNDED)
-          || (leftRows.isBounded() == UNBOUNDED
-              && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
-        // if one of the sides is Bounded & the other is Unbounded
-        // then do a sideInput join
-        // when doing a sideInput join, the windowFn does not need to match
-        // Only support INNER JOIN & LEFT OUTER JOIN where left side of the 
join must be
-        // the unbounded
-        if (joinType == JoinRelType.FULL) {
-          throw new UnsupportedOperationException(
-              "FULL OUTER JOIN is not supported when join "
-                  + "a bounded table with an unbounded table.");
-        }
+  private class SideInputJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
 
-        if ((joinType == JoinRelType.LEFT && leftRows.isBounded() == 
PCollection.IsBounded.BOUNDED)
-            || (joinType == JoinRelType.RIGHT
-                && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
-          throw new UnsupportedOperationException(
-              "LEFT side of an OUTER JOIN must be Unbounded table.");
-        }
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> pinput) {
+      Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
+      Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
 
-        return sideInputJoin(extractedLeftRows, extractedRightRows, 
leftSchema, rightSchema);
-      } else {
-        throw new UnsupportedOperationException(
-            "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn 
+ ", " + rightWinFn);
+      PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new 
ExtractJoinKeys());
+
+      PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
+      PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
+
+      return sideInputJoin(extractedLeftRows, extractedRightRows, leftSchema, 
rightSchema);
+    }
+  }
+
+  private class StandardJoin extends PTransform<PCollectionList<Row>, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> pinput) {
+      Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
+      Schema rightSchema = CalciteUtils.toSchema(right.getRowType());
+
+      PCollectionList<KV<Row, Row>> keyedInputs = pinput.apply(new 
ExtractJoinKeys());
+
+      PCollection<KV<Row, Row>> extractedLeftRows = keyedInputs.get(0);
+      PCollection<KV<Row, Row>> extractedRightRows = keyedInputs.get(1);
+
+      WindowFn leftWinFn = 
extractedLeftRows.getWindowingStrategy().getWindowFn();
+      WindowFn rightWinFn = 
extractedRightRows.getWindowingStrategy().getWindowFn();
+
+      try {
+        leftWinFn.verifyCompatibility(rightWinFn);
+      } catch (IncompatibleWindowException e) {
+        throw new IllegalArgumentException(
+            "WindowFns must match for a 
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
       }
+
+      verifySupportedTrigger(extractedLeftRows);
+      verifySupportedTrigger(extractedRightRows);
+
+      return standardJoin(extractedLeftRows, extractedRightRows, leftSchema, 
rightSchema);
     }
   }
 
-  private void verifySupportedTrigger(PCollection<Row> pCollection) {
+  private <T> void verifySupportedTrigger(PCollection<T> pCollection) {
     WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
 
     if (UNBOUNDED.equals(pCollection.isBounded()) && 
!triggersOncePerWindow(windowingStrategy)) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 6ac89832f9fc..5f060ec5567a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -28,6 +28,23 @@
 /** A {@link RelNode} that can also give a {@link PTransform} that implements 
the expression. */
 public interface BeamRelNode extends RelNode {
 
+  /**
+   * Whether the collection of rows represented by this relational expression 
is bounded (known to
+   * be finite) or unbounded (may or may not be finite).
+   *
+   * @return bounded if and only if all PCollection inputs are bounded
+   */
+  default PCollection.IsBounded isBounded() {
+    return getPCollectionInputs()
+            .stream()
+            .allMatch(
+                rel ->
+                    BeamSqlRelUtils.getBeamRelInput(rel).isBounded()
+                        == PCollection.IsBounded.BOUNDED)
+        ? PCollection.IsBounded.BOUNDED
+        : PCollection.IsBounded.UNBOUNDED;
+  }
+
   default List<RelNode> getPCollectionInputs() {
     return getInputs();
   };
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
index 95b971e01e30..661aec933b58 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
@@ -38,6 +38,11 @@ public BeamPCollectionTable(PCollection<InputT> upstream) {
     this.upstream = upstream;
   }
 
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return upstream.isBounded();
+  }
+
   @Override
   public PCollection<Row> buildIOReader(PBegin begin) {
     assert begin.getPipeline() == upstream.getPipeline();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
index f702d7d54fa5..fdbcea426a96 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
@@ -41,6 +41,11 @@ public BeamBigQueryTable(Schema beamSchema, String 
tableSpec) {
     this.tableSpec = tableSpec;
   }
 
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
   @Override
   public PCollection<Row> buildIOReader(PBegin begin) {
     // TODO: make this more generic.
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 0c17cdef75c3..e643dd5c1e66 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -67,6 +67,11 @@ public BeamKafkaTable updateConsumerProperties(Map<String, 
Object> configUpdates
     return this;
   }
 
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
+
   public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>
       getPTransformForInput();
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
index f22492bff267..a3d338a5942f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
@@ -127,6 +127,11 @@ static Builder builder() {
     return new AutoValue_PubsubIOJsonTable.Builder();
   }
 
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
+
   /**
    * Table schema, describes Pubsub message schema.
    *
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
index bbee5f59bcd4..9a64791a29d9 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestBoundedTable.java
@@ -44,6 +44,11 @@ public TestBoundedTable(Schema beamSchema) {
     super(beamSchema);
   }
 
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
   /**
    * Convenient way to build a mocked bounded table.
    *
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index ea06fc5b0103..34240200b7ac 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -123,6 +123,11 @@ public TableWithRows(long tableProviderInstanceId, Table 
table) {
   private static class InMemoryTable implements BeamSqlTable {
     private TableWithRows tableWithRows;
 
+    @Override
+    public PCollection.IsBounded isBounded() {
+      return PCollection.IsBounded.BOUNDED;
+    }
+
     public InMemoryTable(TableWithRows tableWithRows) {
       this.tableWithRows = tableWithRows;
     }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
index 314738f5e94c..5d41fefda616 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestUnboundedTable.java
@@ -66,6 +66,11 @@ public TestUnboundedTable timestampColumnIndex(int idx) {
     return this;
   }
 
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.UNBOUNDED;
+  }
+
   /**
    * Add rows to the builder.
    *
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
index a7b93b86d228..94674de383c0 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTable.java
@@ -59,6 +59,11 @@ public String getFilePattern() {
     return filePattern;
   }
 
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
   @Override
   public PCollection<Row> buildIOReader(PBegin begin) {
     return begin
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
index 34148ea6abb5..20988033669e 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
@@ -100,6 +100,11 @@ public FakeTable() {
       super(null);
     }
 
+    @Override
+    public PCollection.IsBounded isBounded() {
+      return null;
+    }
+
     @Override
     public PCollection<Row> buildIOReader(PBegin begin) {
       return null;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index da447d510914..067af8ae8366 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -110,6 +110,11 @@ public SiteLookupTable(Schema schema) {
       super(schema);
     }
 
+    @Override
+    public PCollection.IsBounded isBounded() {
+      throw new UnsupportedOperationException();
+    }
+
     @Override
     public PCollection<Row> buildIOReader(PBegin begin) {
       throw new UnsupportedOperationException();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 170050)
    Time Spent: 50m  (was: 40m)

> SQL join selection should be done in planner, not in expansion to PTransform
> ----------------------------------------------------------------------------
>
>                 Key: BEAM-6114
>                 URL: https://issues.apache.org/jira/browse/BEAM-6114
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Currently Beam SQL joins all go through a single physical operator which has 
> a single PTransform that does all join algorithms based on properties of its 
> input PCollections as well as the relational algebra.
> A first step is to make the needed information part of the relational 
> algebra, so it can choose a PTransform based on that, and the PTransforms can 
> be simpler.
> Second step is to have separate (physical) relational operators for different 
> join algorithms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to