This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 14b17ad  [BEAM-3634] Refactor BeamRelNode to return a PTransform 
(#4705)
14b17ad is described below

commit 14b17ad574342a875c8f99278e18c605aa5b4bc3
Author: Kenn Knowles <k...@kennknowles.com>
AuthorDate: Fri Feb 23 16:39:21 2018 -0800

    [BEAM-3634] Refactor BeamRelNode to return a PTransform (#4705)
    
    * Refactor BeamRelNode to return a PTransform
    
    * Refactor SQL toPTransform to have no parameters
---
 .../beam/sdk/extensions/sql/QueryTransform.java    |   3 +-
 .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java   |  13 +-
 .../sql/impl/planner/BeamQueryPlanner.java         |  21 +-
 .../extensions/sql/impl/planner/BeamRuleSets.java  |  54 ++--
 .../sql/impl/rel/BeamAggregationRel.java           |  19 +-
 .../sdk/extensions/sql/impl/rel/BeamFilterRel.java |  34 ++-
 .../sdk/extensions/sql/impl/rel/BeamIOSinkRel.java |  85 ++++--
 .../extensions/sql/impl/rel/BeamIOSourceRel.java   |  45 +--
 .../extensions/sql/impl/rel/BeamIntersectRel.java  |  33 ++-
 .../sdk/extensions/sql/impl/rel/BeamJoinRel.java   | 328 ++++++++++++---------
 .../sdk/extensions/sql/impl/rel/BeamMinusRel.java  |  16 +-
 .../extensions/sql/impl/rel/BeamProjectRel.java    |  37 ++-
 .../sdk/extensions/sql/impl/rel/BeamRelNode.java   |  15 +-
 .../sql/impl/rel/BeamSetOperatorRelBase.java       |  14 +-
 .../sdk/extensions/sql/impl/rel/BeamSortRel.java   |  71 +++--
 .../sdk/extensions/sql/impl/rel/BeamUnionRel.java  |  15 +-
 .../sdk/extensions/sql/impl/rel/BeamValuesRel.java |  40 +--
 .../extensions/sql/impl/rule/BeamIOSinkRule.java   |  58 ++--
 .../extensions/sql/impl/rule/BeamIOSourceRule.java |  30 +-
 .../sdk/extensions/sql/impl/rule/BeamJoinRule.java |  33 ++-
 .../extensions/sql/impl/rule/BeamProjectRule.java  |  37 +--
 .../sdk/extensions/sql/BeamSqlDslJoinTest.java     |   6 +-
 .../interpreter/BeamSqlFnExecutorTestBase.java     |  42 +--
 .../transform/agg/VarianceAccumulatorTest.java     |   6 +-
 .../sql/impl/transform/agg/VarianceFnTest.java     |   3 +-
 25 files changed, 599 insertions(+), 459 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
index 8a7335f..67ccbb1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
@@ -67,10 +67,11 @@ public abstract class QueryTransform extends 
PTransform<PInput, PCollection<Row>
 
     try {
       return
+          inputTuple.apply(
           sqlEnv
               .getPlanner()
               .convertToBeamRel(queryString())
-              .buildBeamPipeline(inputTuple, sqlEnv);
+              .toPTransform());
     } catch (Exception e) {
       throw new IllegalStateException(e);
     }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 6c4c9a4..e81b927 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -55,11 +55,10 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
 
 /**
- * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and
- * {@link BeamSqlCli}.
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and 
{@link BeamSqlCli}.
  *
- * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF 
functions,
- * and a {@link BeamQueryPlanner} which parse/validate/optimize/translate 
input SQL queries.
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF 
functions, and a
+ * {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL 
queries.
  */
 public class BeamSqlEnv implements Serializable {
   transient SchemaPlus schema;
@@ -69,7 +68,7 @@ public class BeamSqlEnv implements Serializable {
   public BeamSqlEnv() {
     tables = new HashMap<>(16);
     schema = Frameworks.createRootSchema(true);
-    planner = new BeamQueryPlanner(schema);
+    planner = new BeamQueryPlanner(this, schema);
   }
 
   /**
@@ -151,7 +150,7 @@ public class BeamSqlEnv implements Serializable {
         schema.add(tableName, new BeamCalciteTable(table.getRowType()));
       }
     }
-    planner = new BeamQueryPlanner(schema);
+    planner = new BeamQueryPlanner(this, schema);
   }
 
   /**
@@ -216,6 +215,6 @@ public class BeamSqlEnv implements Serializable {
 
     tables = new HashMap<String, BeamSqlTable>(16);
     schema = Frameworks.createRootSchema(true);
-    planner = new BeamQueryPlanner(schema);
+    planner = new BeamQueryPlanner(this, schema);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index b9b0fdb..7858c5c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -74,7 +74,7 @@ public class BeamQueryPlanner {
   public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
       RelDataTypeSystem.DEFAULT);
 
-  public BeamQueryPlanner(SchemaPlus schema) {
+  public BeamQueryPlanner(BeamSqlEnv sqlEnv, SchemaPlus schema) {
     String defaultCharsetKey = "saffron.default.charset";
     if (System.getProperty(defaultCharsetKey) == null) {
       System.setProperty(defaultCharsetKey, 
ConversionUtil.NATIVE_UTF16_CHARSET_NAME);
@@ -94,12 +94,17 @@ public class BeamQueryPlanner {
         new CalciteCatalogReader(
             CalciteSchema.from(schema), Collections.emptyList(), TYPE_FACTORY, 
null));
 
-    FrameworkConfig config = Frameworks.newConfigBuilder()
-        
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
-        
.traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
-        
.costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
-        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-        .build();
+    FrameworkConfig config =
+        Frameworks.newConfigBuilder()
+            .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build())
+            .defaultSchema(schema)
+            .traitDefs(traitDefs)
+            .context(Contexts.EMPTY_CONTEXT)
+            .ruleSets(BeamRuleSets.getRuleSets(sqlEnv))
+            .costFactory(null)
+            .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+            .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+            .build();
     this.planner = Frameworks.getPlanner(config);
 
     for (String t : schema.getTableNames()) {
@@ -124,7 +129,7 @@ public class BeamQueryPlanner {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 
     // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
-    return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), 
sqlEnv);
+    return PCollectionTuple.empty(basePipeline).apply(relNode.toPTransform());
   }
 
   /**
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index d3c9871..1d10816 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -17,9 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.planner;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.Iterator;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamFilterRule;
@@ -32,44 +30,30 @@ import 
org.apache.beam.sdk.extensions.sql.impl.rule.BeamProjectRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSortRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnionRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamValuesRule;
-import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.tools.RuleSet;
+import org.apache.calcite.tools.RuleSets;
 
 /**
- * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
- * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode}
- *
+ * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard 
Calcite {@link
+ * RelNode} tree, to represent with {@link BeamRelNode}
  */
 public class BeamRuleSets {
-  private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = 
ImmutableSet
-      .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, 
BeamProjectRule.INSTANCE,
-          BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
-          BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, 
BeamValuesRule.INSTANCE,
-          BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, 
BeamUnionRule.INSTANCE,
-          BeamJoinRule.INSTANCE)
-      .build();
 
-  public static RuleSet[] getRuleSets() {
-    return new RuleSet[] { new BeamRuleSet(
-        
ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build())
 };
+  public static RuleSet[] getRuleSets(BeamSqlEnv sqlEnv) {
+    return new RuleSet[] {
+      RuleSets.ofList(
+          BeamIOSourceRule.forSqlEnv(sqlEnv),
+          BeamProjectRule.INSTANCE,
+          BeamFilterRule.INSTANCE,
+          BeamIOSinkRule.forSqlEnv(sqlEnv),
+          BeamAggregationRule.INSTANCE,
+          BeamSortRule.INSTANCE,
+          BeamValuesRule.INSTANCE,
+          BeamIntersectRule.INSTANCE,
+          BeamMinusRule.INSTANCE,
+          BeamUnionRule.INSTANCE,
+          BeamJoinRule.forSqlEnv(sqlEnv))
+    };
   }
-
-  private static class BeamRuleSet implements RuleSet {
-    final ImmutableSet<RelOptRule> rules;
-
-    public BeamRuleSet(ImmutableSet<RelOptRule> rules) {
-      this.rules = rules;
-    }
-
-    public BeamRuleSet(ImmutableList<RelOptRule> rules) {
-      this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
-    }
-
-    @Override
-    public Iterator<RelOptRule> iterator() {
-      return rules.iterator();
-    }
-  }
-
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index f1fb12d..a35e64c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -24,12 +24,12 @@ import java.util.List;
 import java.util.Optional;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.RowCoder;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rule.AggregateWindowField;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.WithTimestamps;
@@ -77,15 +77,19 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
   }
 
   @Override
-  public PCollection<Row> buildBeamPipeline(
-      PCollectionTuple inputPCollections,
-      BeamSqlEnv sqlEnv) throws Exception {
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
+
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
 
     RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this) + "_";
+      String stageName = BeamSqlRelUtils.getStageName(BeamAggregationRel.this) 
+ "_";
 
-    PCollection<Row> upstream =
-        
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, 
sqlEnv);
+      PCollection<Row> upstream =
+          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
     if (windowField.isPresent()) {
       upstream = upstream.apply(stageName + "assignEventTimestamp", 
WithTimestamps
           .of(new 
BeamAggregationTransforms.WindowTimestampFn(windowFieldIndex))
@@ -188,6 +192,7 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
         RowType
             .newField(aggCall.name, 
CalciteUtils.toCoder(aggCall.type.getSqlTypeName()));
   }
+  }
 
   @Override
   public Aggregate copy(
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
index d684636..ec21a9b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamFilterRel.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlFilterFn;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -49,23 +49,27 @@ public class BeamFilterRel extends Filter implements 
BeamRelNode {
   }
 
   @Override
-  public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this);
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
 
-    PCollection<Row> upstream =
-        
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, 
sqlEnv);
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
 
-    BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      RelNode input = getInput();
+      String stageName = BeamSqlRelUtils.getStageName(BeamFilterRel.this);
 
-    PCollection<Row> filterStream = upstream
-        .apply(
-            stageName,
-            ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
-    
filterStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+      PCollection<Row> upstream =
+          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
 
-    return filterStream;
-  }
+      BeamSqlExpressionExecutor executor = new 
BeamSqlFnExecutor(BeamFilterRel.this);
 
+      PCollection<Row> filterStream =
+          upstream.apply(stageName, ParDo.of(new 
BeamSqlFilterFn(getRelTypeName(), executor)));
+      
filterStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+
+      return filterStream;
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
index d38b2ac..6afa8b1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java
@@ -21,6 +21,7 @@ import com.google.common.base.Joiner;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
@@ -32,44 +33,76 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rex.RexNode;
 
-/**
- * BeamRelNode to replace a {@code TableModify} node.
- *
- */
+/** BeamRelNode to replace a {@code TableModify} node. */
 public class BeamIOSinkRel extends TableModify implements BeamRelNode {
-  public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable 
table,
-      Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
-      List<String> updateColumnList, List<RexNode> sourceExpressionList, 
boolean flattened) {
-    super(cluster, traits, table, catalogReader, child, operation, 
updateColumnList,
-        sourceExpressionList, flattened);
+
+  private final BeamSqlEnv sqlEnv;
+
+  public BeamIOSinkRel(
+      BeamSqlEnv sqlEnv,
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      RelOptTable table,
+      Prepare.CatalogReader catalogReader,
+      RelNode child,
+      Operation operation,
+      List<String> updateColumnList,
+      List<RexNode> sourceExpressionList,
+      boolean flattened) {
+    super(
+        cluster,
+        traits,
+        table,
+        catalogReader,
+        child,
+        operation,
+        updateColumnList,
+        sourceExpressionList,
+        flattened);
+    this.sqlEnv = sqlEnv;
   }
 
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new BeamIOSinkRel(getCluster(), traitSet, getTable(), 
getCatalogReader(), sole(inputs),
-        getOperation(), getUpdateColumnList(), getSourceExpressionList(), 
isFlattened());
+    return new BeamIOSinkRel(
+        sqlEnv,
+        getCluster(),
+        traitSet,
+        getTable(),
+        getCatalogReader(),
+        sole(inputs),
+        getOperation(),
+        getUpdateColumnList(),
+        getSourceExpressionList(),
+        isFlattened());
   }
 
-  /**
-   * Note that {@code BeamIOSinkRel} returns the input PCollection,
-   * which is the persisted PCollection.
-   */
   @Override
-  public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this);
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
 
-    PCollection<Row> upstream =
-        
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, 
sqlEnv);
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
 
-    String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+    /**
+     * Note that {@code BeamIOSinkRel} returns the input PCollection, which is 
the persisted
+     * PCollection.
+     */
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      RelNode input = getInput();
+      String stageName = BeamSqlRelUtils.getStageName(BeamIOSinkRel.this);
 
-    BeamSqlTable targetTable = sqlEnv.findTable(sourceName);
+      PCollection<Row> upstream =
+          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
 
-    upstream.apply(stageName, targetTable.buildIOWriter());
+      String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
-    return upstream;
-  }
+      BeamSqlTable targetTable = sqlEnv.findTable(sourceName);
 
+      upstream.apply(stageName, targetTable.buildIOWriter());
+
+      return upstream;
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 23f6a4f..64bfd26 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -21,6 +21,7 @@ import com.google.common.base.Joiner;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
@@ -36,27 +37,37 @@ import org.apache.calcite.rel.core.TableScan;
  */
 public class BeamIOSourceRel extends TableScan implements BeamRelNode {
 
-  public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, 
RelOptTable table) {
+  private BeamSqlEnv sqlEnv;
+
+  public BeamIOSourceRel(
+      BeamSqlEnv sqlEnv, RelOptCluster cluster, RelTraitSet traitSet, 
RelOptTable table) {
     super(cluster, traitSet, table);
+    this.sqlEnv = sqlEnv;
   }
 
   @Override
-  public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
-    TupleTag<Row> sourceTupleTag = new TupleTag<>(sourceName);
-    if (inputPCollections.has(sourceTupleTag)) {
-      //choose PCollection from input PCollectionTuple if exists there.
-      PCollection<Row> sourceStream = inputPCollections
-          .get(new TupleTag<Row>(sourceName));
-      return sourceStream;
-    } else {
-      //If not, the source PColection is provided with 
BaseBeamTable.buildIOReader().
-      BeamSqlTable sourceTable = sqlEnv.findTable(sourceName);
-      return sourceTable.buildIOReader(inputPCollections.getPipeline())
-          .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
-    }
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
   }
 
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+      TupleTag<Row> sourceTupleTag = new TupleTag<>(sourceName);
+      if (inputPCollections.has(sourceTupleTag)) {
+        // choose PCollection from input PCollectionTuple if exists there.
+        PCollection<Row> sourceStream = inputPCollections.get(new 
TupleTag<Row>(sourceName));
+        return sourceStream;
+      } else {
+        // If not, the source PColection is provided with 
BaseBeamTable.buildIOReader().
+        BeamSqlTable sourceTable = sqlEnv.findTable(sourceName);
+        return sourceTable
+            .buildIOReader(inputPCollections.getPipeline())
+            .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+      }
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
index 7c28ea7..b5002ea 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
@@ -32,27 +32,34 @@ import org.apache.calcite.rel.core.SetOp;
 /**
  * {@code BeamRelNode} to replace a {@code Intersect} node.
  *
- * <p>This is used to combine two SELECT statements, but returns rows only 
from the
- * first SELECT statement that are identical to a row in the second SELECT 
statement.
+ * <p>This is used to combine two SELECT statements, but returns rows only 
from the first SELECT
+ * statement that are identical to a row in the second SELECT statement.
  */
 public class BeamIntersectRel extends Intersect implements BeamRelNode {
   private BeamSetOperatorRelBase delegate;
+
   public BeamIntersectRel(
-      RelOptCluster cluster,
-      RelTraitSet traits,
-      List<RelNode> inputs,
-      boolean all) {
+      RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean 
all) {
     super(cluster, traits, inputs, all);
-    delegate = new BeamSetOperatorRelBase(this,
-        BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+    delegate =
+        new BeamSetOperatorRelBase(this, 
BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
   }
 
-  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, 
boolean all) {
+  @Override
+  public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
     return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple 
inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+  @Override
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
+
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      return delegate.buildBeamPipeline(inputPCollections);
+    }
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 89196ef..615463b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
@@ -92,128 +93,153 @@ import org.apache.calcite.util.Pair;
  * </ul>
  */
 public class BeamJoinRel extends Join implements BeamRelNode {
-  public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, 
RelNode right,
-      RexNode condition, Set<CorrelationId> variablesSet, JoinRelType 
joinType) {
+  private final BeamSqlEnv sqlEnv;
+
+  public BeamJoinRel(
+      BeamSqlEnv sqlEnv,
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      RelNode left,
+      RelNode right,
+      RexNode condition,
+      Set<CorrelationId> variablesSet,
+      JoinRelType joinType) {
     super(cluster, traits, left, right, condition, variablesSet, joinType);
+    this.sqlEnv = sqlEnv;
   }
 
-  @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, 
RelNode left,
-      RelNode right, JoinRelType joinType, boolean semiJoinDone) {
-    return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, 
variablesSet,
+  @Override
+  public Join copy(
+      RelTraitSet traitSet,
+      RexNode conditionExpr,
+      RelNode left,
+      RelNode right,
+      JoinRelType joinType,
+      boolean semiJoinDone) {
+    return new BeamJoinRel(sqlEnv, getCluster(), traitSet, left, right, 
conditionExpr, variablesSet,
         joinType);
   }
 
-  @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple 
inputPCollections,
-                                                      BeamSqlEnv sqlEnv)
-      throws Exception {
-    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    RowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
-    final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+  @Override
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
 
-    if (!seekable(leftRelNode, sqlEnv) && seekable(rightRelNode, sqlEnv)) {
-      return joinAsLookup(leftRelNode, rightRelNode, inputPCollections, sqlEnv)
-              
.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
-    }
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
 
-    PCollection<Row> leftRows = 
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-    PCollection<Row> rightRows = 
rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
-    verifySupportedTrigger(leftRows);
-    verifySupportedTrigger(rightRows);
-
-    String stageName = BeamSqlRelUtils.getStageName(this);
-    WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
-    WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
-
-    // extract the join fields
-    List<Pair<Integer, Integer>> pairs = extractJoinColumns(
-        leftRelNode.getRowType().getFieldCount());
-
-    // build the extract key type
-    // the name of the join field is not important
-    RowType extractKeyRowType =
-        pairs
-            .stream()
-            .map(pair ->
-                     RowType.newField(
-                         leftRowType.getFieldName(pair.getKey()),
-                         leftRowType.getFieldCoder(pair.getKey())))
-            .collect(toRowType());
-
-    Coder extractKeyRowCoder = extractKeyRowType.getRowCoder();
-
-    // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
-    PCollection<KV<Row, Row>> extractedLeftRows = leftRows
-        .apply(stageName + "_left_ExtractJoinFields",
-            MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, 
pairs)))
-        .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
-
-    PCollection<KV<Row, Row>> extractedRightRows = rightRows
-        .apply(stageName + "_right_ExtractJoinFields",
-            MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, 
pairs)))
-        .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
-
-    // prepare the NullRows
-    Row leftNullRow = buildNullRow(leftRelNode);
-    Row rightNullRow = buildNullRow(rightRelNode);
-
-    // a regular join
-    if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
-            && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
-           || (leftRows.isBounded() == UNBOUNDED
-                && rightRows.isBounded() == UNBOUNDED)) {
-      try {
-        leftWinFn.verifyCompatibility(rightWinFn);
-      } catch (IncompatibleWindowException e) {
-        throw new IllegalArgumentException(
-            "WindowFns must match for a 
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
-      }
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+      RowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
+      final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
 
-      return standardJoin(extractedLeftRows, extractedRightRows,
-          leftNullRow, rightNullRow, stageName);
-    } else if (
-        (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
-        && rightRows.isBounded() == UNBOUNDED)
-        || (leftRows.isBounded() == UNBOUNDED
-            && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
-        ) {
-      // if one of the sides is Bounded & the other is Unbounded
-      // then do a sideInput join
-      // when doing a sideInput join, the windowFn does not need to match
-      // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join 
must be
-      // the unbounded
-      if (joinType == JoinRelType.FULL) {
-        throw new UnsupportedOperationException("FULL OUTER JOIN is not 
supported when join "
-            + "a bounded table with an unbounded table.");
+      if (!seekable(leftRelNode, sqlEnv) && seekable(rightRelNode, sqlEnv)) {
+        return joinAsLookup(leftRelNode, rightRelNode, inputPCollections, 
sqlEnv)
+            .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
       }
 
-      if ((joinType == JoinRelType.LEFT
-          && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
-          || (joinType == JoinRelType.RIGHT
-          && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+      PCollection<Row> leftRows = inputPCollections.apply("left", 
leftRelNode.toPTransform());
+      PCollection<Row> rightRows =
+          inputPCollections.apply("right", rightRelNode.toPTransform());
+
+      verifySupportedTrigger(leftRows);
+      verifySupportedTrigger(rightRows);
+
+      String stageName = BeamSqlRelUtils.getStageName(BeamJoinRel.this);
+      WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
+      WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
+
+      // extract the join fields
+      List<Pair<Integer, Integer>> pairs =
+          extractJoinColumns(leftRelNode.getRowType().getFieldCount());
+
+      // build the extract key type
+      // the name of the join field is not important
+      RowType extractKeyRowType =
+          pairs
+              .stream()
+              .map(
+                  pair ->
+                      RowType.newField(
+                          leftRowType.getFieldName(pair.getKey()),
+                          leftRowType.getFieldCoder(pair.getKey())))
+              .collect(toRowType());
+
+      Coder extractKeyRowCoder = extractKeyRowType.getRowCoder();
+
+      // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
+      PCollection<KV<Row, Row>> extractedLeftRows =
+          leftRows
+              .apply(
+                  stageName + "_left_ExtractJoinFields",
+                  MapElements.via(new 
BeamJoinTransforms.ExtractJoinFields(true, pairs)))
+              .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
+
+      PCollection<KV<Row, Row>> extractedRightRows =
+          rightRows
+              .apply(
+                  stageName + "_right_ExtractJoinFields",
+                  MapElements.via(new 
BeamJoinTransforms.ExtractJoinFields(false, pairs)))
+              .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
+
+      // prepare the NullRows
+      Row leftNullRow = buildNullRow(leftRelNode);
+      Row rightNullRow = buildNullRow(rightRelNode);
+
+      // a regular join
+      if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+              && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+          || (leftRows.isBounded() == UNBOUNDED && rightRows.isBounded() == 
UNBOUNDED)) {
+        try {
+          leftWinFn.verifyCompatibility(rightWinFn);
+        } catch (IncompatibleWindowException e) {
+          throw new IllegalArgumentException(
+              "WindowFns must match for a 
bounded-vs-bounded/unbounded-vs-unbounded join.", e);
+        }
+
+        return standardJoin(
+            extractedLeftRows, extractedRightRows, leftNullRow, rightNullRow, 
stageName);
+      } else if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+              && rightRows.isBounded() == UNBOUNDED)
+          || (leftRows.isBounded() == UNBOUNDED
+              && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+        // if one of the sides is Bounded & the other is Unbounded
+        // then do a sideInput join
+        // when doing a sideInput join, the windowFn does not need to match
+        // Only support INNER JOIN & LEFT OUTER JOIN where left side of the 
join must be
+        // the unbounded
+        if (joinType == JoinRelType.FULL) {
+          throw new UnsupportedOperationException(
+              "FULL OUTER JOIN is not supported when join "
+                  + "a bounded table with an unbounded table.");
+        }
+
+        if ((joinType == JoinRelType.LEFT && leftRows.isBounded() == 
PCollection.IsBounded.BOUNDED)
+            || (joinType == JoinRelType.RIGHT
+                && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+          throw new UnsupportedOperationException(
+              "LEFT side of an OUTER JOIN must be Unbounded table.");
+        }
+
+        return sideInputJoin(extractedLeftRows, extractedRightRows, 
leftNullRow, rightNullRow);
+      } else {
         throw new UnsupportedOperationException(
-            "LEFT side of an OUTER JOIN must be Unbounded table.");
+            "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn 
+ ", " + rightWinFn);
       }
-
-      return sideInputJoin(extractedLeftRows, extractedRightRows,
-          leftNullRow, rightNullRow);
-    } else {
-      throw new UnsupportedOperationException(
-          "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + 
", " + rightWinFn);
     }
   }
 
   private void verifySupportedTrigger(PCollection<Row> pCollection) {
     WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
 
-    if (UNBOUNDED.equals(pCollection.isBounded())
-        && !triggersOncePerWindow(windowingStrategy)) {
+    if (UNBOUNDED.equals(pCollection.isBounded()) && 
!triggersOncePerWindow(windowingStrategy)) {
       throw new UnsupportedOperationException(
           "Joining unbounded PCollections is currently only supported for "
               + "non-global windows with triggers that are known to produce 
output once per window,"
               + "such as the default trigger with zero allowed lateness. "
               + "In these cases Beam can guarantee it joins all input elements 
once per window. "
-              + windowingStrategy + " is not supported");
+              + windowingStrategy
+              + " is not supported");
     }
   }
 
@@ -228,69 +254,79 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
   private PCollection<Row> standardJoin(
       PCollection<KV<Row, Row>> extractedLeftRows,
       PCollection<KV<Row, Row>> extractedRightRows,
-      Row leftNullRow, Row rightNullRow, String stageName) {
+      Row leftNullRow,
+      Row rightNullRow,
+      String stageName) {
     PCollection<KV<Row, KV<Row, Row>>> joinedRows = null;
     switch (joinType) {
       case LEFT:
-        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
-            .leftOuterJoin(extractedLeftRows, extractedRightRows, 
rightNullRow);
+        joinedRows =
+            org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(
+                extractedLeftRows, extractedRightRows, rightNullRow);
         break;
       case RIGHT:
-        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
-            .rightOuterJoin(extractedLeftRows, extractedRightRows, 
leftNullRow);
+        joinedRows =
+            org.apache.beam.sdk.extensions.joinlibrary.Join.rightOuterJoin(
+                extractedLeftRows, extractedRightRows, leftNullRow);
         break;
       case FULL:
-        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
-            .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
-            rightNullRow);
+        joinedRows =
+            org.apache.beam.sdk.extensions.joinlibrary.Join.fullOuterJoin(
+                extractedLeftRows, extractedRightRows, leftNullRow, 
rightNullRow);
         break;
       case INNER:
       default:
-        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
-            .innerJoin(extractedLeftRows, extractedRightRows);
+        joinedRows =
+            org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(
+                extractedLeftRows, extractedRightRows);
         break;
     }
 
-    PCollection<Row> ret = joinedRows
-        .apply(stageName + "_JoinParts2WholeRow",
-            MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
-        .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+    PCollection<Row> ret =
+        joinedRows
+            .apply(
+                stageName + "_JoinParts2WholeRow",
+                MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
+            .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
     return ret;
   }
 
   public PCollection<Row> sideInputJoin(
       PCollection<KV<Row, Row>> extractedLeftRows,
       PCollection<KV<Row, Row>> extractedRightRows,
-      Row leftNullRow, Row rightNullRow) {
+      Row leftNullRow,
+      Row rightNullRow) {
     // we always make the Unbounded table on the left to do the sideInput join
     // (will convert the result accordingly before return)
     boolean swapped = (extractedLeftRows.isBounded() == 
PCollection.IsBounded.BOUNDED);
     JoinRelType realJoinType =
         (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : 
joinType;
 
-    PCollection<KV<Row, Row>> realLeftRows =
-        swapped ? extractedRightRows : extractedLeftRows;
-    PCollection<KV<Row, Row>> realRightRows =
-        swapped ? extractedLeftRows : extractedRightRows;
+    PCollection<KV<Row, Row>> realLeftRows = swapped ? extractedRightRows : 
extractedLeftRows;
+    PCollection<KV<Row, Row>> realRightRows = swapped ? extractedLeftRows : 
extractedRightRows;
     Row realRightNullRow = swapped ? leftNullRow : rightNullRow;
 
     // swapped still need to pass down because, we need to swap the result 
back.
-    return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
-        realRightNullRow, swapped);
+    return sideInputJoinHelper(
+        realJoinType, realLeftRows, realRightRows, realRightNullRow, swapped);
   }
 
   private PCollection<Row> sideInputJoinHelper(
       JoinRelType joinType,
       PCollection<KV<Row, Row>> leftRows,
       PCollection<KV<Row, Row>> rightRows,
-      Row rightNullRow, boolean swapped) {
-    final PCollectionView<Map<Row, Iterable<Row>>> rowsView =
-        rightRows.apply(View.asMultimap());
-
-    PCollection<Row> ret = leftRows
-        .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
-            joinType, rightNullRow, rowsView, 
swapped)).withSideInputs(rowsView))
-        .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+      Row rightNullRow,
+      boolean swapped) {
+    final PCollectionView<Map<Row, Iterable<Row>>> rowsView = 
rightRows.apply(View.asMultimap());
+
+    PCollection<Row> ret =
+        leftRows
+            .apply(
+                ParDo.of(
+                        new BeamJoinTransforms.SideInputJoinDoFn(
+                            joinType, rightNullRow, rowsView, swapped))
+                    .withSideInputs(rowsView))
+            .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
 
     return ret;
   }
@@ -324,28 +360,34 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     return pairs;
   }
 
-  private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
-      int leftRowColumnCount) {
+  private Pair<Integer, Integer> extractOneJoinColumn(
+      RexCall oneCondition, int leftRowColumnCount) {
     List<RexNode> operands = oneCondition.getOperands();
-    final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
-        ((RexInputRef) operands.get(1)).getIndex());
+    final int leftIndex =
+        Math.min(
+            ((RexInputRef) operands.get(0)).getIndex(), ((RexInputRef) 
operands.get(1)).getIndex());
 
-    final int rightIndex1 = Math.max(((RexInputRef) 
operands.get(0)).getIndex(),
-        ((RexInputRef) operands.get(1)).getIndex());
+    final int rightIndex1 =
+        Math.max(
+            ((RexInputRef) operands.get(0)).getIndex(), ((RexInputRef) 
operands.get(1)).getIndex());
     final int rightIndex = rightIndex1 - leftRowColumnCount;
 
     return new Pair<>(leftIndex, rightIndex);
   }
 
-  private PCollection<Row> joinAsLookup(BeamRelNode leftRelNode,
-                                        BeamRelNode rightRelNode,
-                                        PCollectionTuple inputPCollections,
-                                        BeamSqlEnv sqlEnv) throws Exception {
-    PCollection<Row> factStream = 
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+  private PCollection<Row> joinAsLookup(
+      BeamRelNode leftRelNode,
+      BeamRelNode rightRelNode,
+      PCollectionTuple inputPCollections,
+      BeamSqlEnv sqlEnv) {
+    PCollection<Row> factStream = 
inputPCollections.apply(leftRelNode.toPTransform());
     BeamSqlSeekableTable seekableTable = 
getSeekableTableFromRelNode(rightRelNode, sqlEnv);
 
-    return factStream.apply("join_as_lookup",
-        new BeamJoinTransforms.JoinAsLookup(condition, seekableTable,
+    return factStream.apply(
+        "join_as_lookup",
+        new BeamJoinTransforms.JoinAsLookup(
+            condition,
+            seekableTable,
             CalciteUtils.toBeamRowType(rightRelNode.getRowType()),
             
CalciteUtils.toBeamRowType(leftRelNode.getRowType()).getFieldCount()));
   }
@@ -357,9 +399,7 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
     return (BeamSqlSeekableTable) sourceTable;
   }
 
-  /**
-   * check if {@code BeamRelNode} implements {@code BeamSeekableTable}.
-   */
+  /** check if {@code BeamRelNode} implements {@code BeamSeekableTable}. */
   private boolean seekable(BeamRelNode relNode, BeamSqlEnv sqlEnv) {
     if (relNode instanceof BeamIOSourceRel) {
       BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
@@ -370,5 +410,5 @@ public class BeamJoinRel extends Join implements 
BeamRelNode {
       }
     }
     return false;
-}
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
index 9fdafda..7f4fc2c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
@@ -49,8 +49,16 @@ public class BeamMinusRel extends Minus implements 
BeamRelNode {
     return new BeamMinusRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple 
inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+  @Override
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
+
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      return delegate.buildBeamPipeline(inputPCollections);
+    }
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
index ea89874..b8bf644 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamProjectRel.java
@@ -18,11 +18,11 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlProjectFn;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -59,22 +59,31 @@ public class BeamProjectRel extends Project implements 
BeamRelNode {
   }
 
   @Override
-  public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this);
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
 
-    PCollection<Row> upstream =
-        
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, 
sqlEnv);
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
 
-    BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      RelNode input = getInput();
+      String stageName = BeamSqlRelUtils.getStageName(BeamProjectRel.this);
 
-    PCollection<Row> projectStream = upstream.apply(stageName, ParDo
-        .of(new BeamSqlProjectFn(getRelTypeName(), executor,
-            CalciteUtils.toBeamRowType(rowType))));
-    
projectStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+      PCollection<Row> upstream =
+          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
 
-    return projectStream;
-  }
+      BeamSqlExpressionExecutor executor = new 
BeamSqlFnExecutor(BeamProjectRel.this);
 
+      PCollection<Row> projectStream =
+          upstream.apply(
+              stageName,
+              ParDo.of(
+                  new BeamSqlProjectFn(
+                      getRelTypeName(), executor, 
CalciteUtils.toBeamRowType(rowType))));
+      
projectStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+
+      return projectStream;
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index aa56745..1cdde33 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -17,23 +17,18 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.rel.RelNode;
 
-/**
- * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is 
added.
- */
+/** A {@link RelNode} that can also give a {@link PTransform} that implements 
the expression. */
 public interface BeamRelNode extends RelNode {
 
   /**
-   * A {@link BeamRelNode} is a recursive structure, the
-   * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
-   * algorithm.
+   * A {@link BeamRelNode} is a recursive structure, the {@code 
BeamQueryPlanner} visits it with a
+   * DFS(Depth-First-Search) algorithm.
    */
-  PCollection<Row> buildBeamPipeline(
-      PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
-      throws Exception;
+  PTransform<PCollectionTuple, PCollection<Row>> toPTransform();
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index 0a9af42..ea536ac 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.io.Serializable;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -62,12 +61,13 @@ public class BeamSetOperatorRelBase {
     this.all = all;
   }
 
-  public PCollection<Row> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    PCollection<Row> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
-        .buildBeamPipeline(inputPCollections, sqlEnv);
-    PCollection<Row> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
-        .buildBeamPipeline(inputPCollections, sqlEnv);
+  public PCollection<Row> buildBeamPipeline(PCollectionTuple 
inputPCollections) {
+    PCollection<Row> leftRows =
+        inputPCollections.apply(
+            "left", 
BeamSqlRelUtils.getBeamRelInput(inputs.get(0)).toPTransform());
+    PCollection<Row> rightRows =
+        inputPCollections.apply(
+            "right", 
BeamSqlRelUtils.getBeamRelInput(inputs.get(1)).toPTransform());
 
     WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
     WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 16cdc7e..0633668 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -119,41 +119,49 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
     }
   }
 
-  @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple 
inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    PCollection<Row> upstream = BeamSqlRelUtils.getBeamRelInput(input)
-        .buildBeamPipeline(inputPCollections, sqlEnv);
-    Type windowType = upstream.getWindowingStrategy().getWindowFn()
-        .getWindowTypeDescriptor().getType();
-    if (!windowType.equals(GlobalWindow.class)) {
-      throw new UnsupportedOperationException(
-          "`ORDER BY` is only supported for GlobalWindow, actual window: " + 
windowType);
-    }
+  @Override
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
 
-    BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, 
orientation,
-        nullsFirst);
-    // first find the top (offset + count)
-    PCollection<List<Row>> rawStream =
-        upstream
-            .apply(
-                "extractTopOffsetAndFetch",
-                Top.of(startIndex + count, comparator).withoutDefaults())
-            .setCoder(ListCoder.of(upstream.getCoder()));
-
-    // strip the `leading offset`
-    if (startIndex > 0) {
-      rawStream =
-          rawStream
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      RelNode input = getInput();
+      PCollection<Row> upstream =
+          
inputPCollections.apply(BeamSqlRelUtils.getBeamRelInput(input).toPTransform());
+      Type windowType =
+          
upstream.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor().getType();
+      if (!windowType.equals(GlobalWindow.class)) {
+        throw new UnsupportedOperationException(
+            "`ORDER BY` is only supported for GlobalWindow, actual window: " + 
windowType);
+      }
+
+      BeamSqlRowComparator comparator =
+          new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst);
+      // first find the top (offset + count)
+      PCollection<List<Row>> rawStream =
+          upstream
               .apply(
-                  "stripLeadingOffset", ParDo.of(new SubListFn<>(startIndex, 
startIndex + count)))
+                  "extractTopOffsetAndFetch",
+                  Top.of(startIndex + count, comparator).withoutDefaults())
               .setCoder(ListCoder.of(upstream.getCoder()));
-    }
 
-    PCollection<Row> orderedStream = rawStream.apply("flatten", 
Flatten.iterables());
-    
orderedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
+      // strip the `leading offset`
+      if (startIndex > 0) {
+        rawStream =
+            rawStream
+                .apply(
+                    "stripLeadingOffset", ParDo.of(new SubListFn<>(startIndex, 
startIndex + count)))
+                .setCoder(ListCoder.of(upstream.getCoder()));
+      }
+
+      PCollection<Row> orderedStream = rawStream.apply("flatten", 
Flatten.iterables());
+      
orderedStream.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRowCoder());
 
-    return orderedStream;
+      return orderedStream;
+    }
   }
 
   private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
@@ -233,4 +241,5 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
       return 0;
     }
   }
+
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
index f8d34c2..f828597 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -76,8 +76,15 @@ public class BeamUnionRel extends Union implements 
BeamRelNode {
     return new BeamUnionRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<Row> buildBeamPipeline(PCollectionTuple 
inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
+  @Override
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
+
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
+      return delegate.buildBeamPipeline(inputPCollections);
+    }
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index 1e98968..6d87998 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -25,9 +25,9 @@ import static org.apache.beam.sdk.values.Row.toRow;
 import com.google.common.collect.ImmutableList;
 import java.util.List;
 import java.util.stream.IntStream;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
@@ -58,28 +58,30 @@ public class BeamValuesRel extends Values implements 
BeamRelNode {
 
   }
 
-  @Override public PCollection<Row> buildBeamPipeline(
-      PCollectionTuple inputPCollections,
-      BeamSqlEnv sqlEnv) throws Exception {
+  @Override
+  public PTransform<PCollectionTuple, PCollection<Row>> toPTransform() {
+    return new Transform();
+  }
 
-    String stageName = BeamSqlRelUtils.getStageName(this);
-    if (tuples.isEmpty()) {
-      throw new IllegalStateException("Values with empty tuples!");
-    }
+  private class Transform extends PTransform<PCollectionTuple, 
PCollection<Row>> {
 
-    RowType rowType = CalciteUtils.toBeamRowType(this.getRowType());
+    @Override
+    public PCollection<Row> expand(PCollectionTuple inputPCollections) {
 
-    List<Row> rows =
-        tuples
-            .stream()
-            .map(tuple -> tupleToRow(rowType, tuple))
-            .collect(toList());
+      String stageName = BeamSqlRelUtils.getStageName(BeamValuesRel.this);
+      if (tuples.isEmpty()) {
+        throw new IllegalStateException("Values with empty tuples!");
+      }
 
-    return
-        inputPCollections
-            .getPipeline()
-            .apply(stageName, Create.of(rows))
-            .setCoder(rowType.getRowCoder());
+      RowType rowType = CalciteUtils.toBeamRowType(getRowType());
+
+      List<Row> rows = tuples.stream().map(tuple -> tupleToRow(rowType, 
tuple)).collect(toList());
+
+      return inputPCollections
+          .getPipeline()
+          .apply(stageName, Create.of(rows))
+          .setCoder(rowType.getRowCoder());
+    }
   }
 
   private Row tupleToRow(RowType rowType, ImmutableList<RexLiteral> tuple) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
index 77f4bdd..866298f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSinkRule.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.rule;
 
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
 import org.apache.calcite.plan.Convention;
@@ -32,17 +33,22 @@ import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.Table;
 
-/**
- * A {@code ConverterRule} to replace {@link TableModify} with
- * {@link BeamIOSinkRel}.
- *
- */
+/** A {@code ConverterRule} to replace {@link TableModify} with {@link 
BeamIOSinkRel}. */
 public class BeamIOSinkRule extends ConverterRule {
-  public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
 
-  private BeamIOSinkRule() {
-    super(LogicalTableModify.class, Convention.NONE, 
BeamLogicalConvention.INSTANCE,
+  private final BeamSqlEnv sqlEnv;
+
+  public static BeamIOSinkRule forSqlEnv(BeamSqlEnv sqlEnv) {
+    return new BeamIOSinkRule(sqlEnv);
+  }
+
+  private BeamIOSinkRule(BeamSqlEnv sqlEnv) {
+    super(
+        LogicalTableModify.class,
+        Convention.NONE,
+        BeamLogicalConvention.INSTANCE,
         "BeamIOSinkRule");
+    this.sqlEnv = sqlEnv;
   }
 
   @Override
@@ -54,8 +60,8 @@ public class BeamIOSinkRule extends ConverterRule {
     final RelTraitSet traitSet = 
tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
     final RelOptTable relOptTable = tableModify.getTable();
     final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
-    final RelNode convertedInput = convert(input,
-        input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
+    final RelNode convertedInput =
+        convert(input, 
input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
     final TableModify.Operation operation = tableModify.getOperation();
     final List<String> updateColumnList = tableModify.getUpdateColumnList();
     final List<RexNode> sourceExpressionList = 
tableModify.getSourceExpressionList();
@@ -64,18 +70,26 @@ public class BeamIOSinkRule extends ConverterRule {
     final Table table = tableModify.getTable().unwrap(Table.class);
 
     switch (table.getJdbcTableType()) {
-    case TABLE:
-    case STREAM:
-      if (operation != TableModify.Operation.INSERT) {
-        throw new UnsupportedOperationException(
-            String.format("Streams doesn't support %s modify operation", 
operation));
-      }
-      return new BeamIOSinkRel(cluster, traitSet,
-          relOptTable, catalogReader, convertedInput, operation, 
updateColumnList,
-          sourceExpressionList, flattened);
-    default:
-      throw new IllegalArgumentException(
-          String.format("Unsupported table type: %s", 
table.getJdbcTableType()));
+      case TABLE:
+      case STREAM:
+        if (operation != TableModify.Operation.INSERT) {
+          throw new UnsupportedOperationException(
+              String.format("Streams doesn't support %s modify operation", 
operation));
+        }
+        return new BeamIOSinkRel(
+            sqlEnv,
+            cluster,
+            traitSet,
+            relOptTable,
+            catalogReader,
+            convertedInput,
+            operation,
+            updateColumnList,
+            sourceExpressionList,
+            flattened);
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unsupported table type: %s", 
table.getJdbcTableType()));
     }
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
index a257d3d..7dc0b18 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOSourceRule.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rule;
 
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
 import org.apache.calcite.plan.Convention;
@@ -25,25 +26,32 @@ import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 
-/**
- * A {@code ConverterRule} to replace {@link TableScan} with
- * {@link BeamIOSourceRel}.
- *
- */
+/** A {@code ConverterRule} to replace {@link TableScan} with {@link 
BeamIOSourceRel}. */
 public class BeamIOSourceRule extends ConverterRule {
-  public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
 
-  private BeamIOSourceRule() {
-    super(LogicalTableScan.class, Convention.NONE, 
BeamLogicalConvention.INSTANCE,
+  private final BeamSqlEnv sqlEnv;
+
+  public static BeamIOSourceRule forSqlEnv(BeamSqlEnv sqlEnv) {
+    return new BeamIOSourceRule(sqlEnv);
+  }
+
+  private BeamIOSourceRule(BeamSqlEnv sqlEnv) {
+    super(
+        LogicalTableScan.class,
+        Convention.NONE,
+        BeamLogicalConvention.INSTANCE,
         "BeamIOSourceRule");
+    this.sqlEnv = sqlEnv;
   }
 
   @Override
   public RelNode convert(RelNode rel) {
     final TableScan scan = (TableScan) rel;
 
-    return new BeamIOSourceRel(scan.getCluster(),
-        scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), 
scan.getTable());
+    return new BeamIOSourceRel(
+        sqlEnv,
+        scan.getCluster(),
+        scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        scan.getTable());
   }
-
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
index 4d9dd20..ff8029e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamJoinRule.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.rule;
 
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
 import org.apache.calcite.plan.Convention;
@@ -26,28 +27,32 @@ import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.logical.LogicalJoin;
 
-/**
- * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
- */
+/** {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. */
 public class BeamJoinRule extends ConverterRule {
-  public static final BeamJoinRule INSTANCE = new BeamJoinRule();
-  private BeamJoinRule() {
-    super(LogicalJoin.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamJoinRule");
+  private final BeamSqlEnv sqlEnv;
+
+  public static BeamJoinRule forSqlEnv(BeamSqlEnv sqlEnv) {
+    return new BeamJoinRule(sqlEnv);
+  }
+
+  private BeamJoinRule(BeamSqlEnv sqlEnv) {
+    super(LogicalJoin.class, Convention.NONE, BeamLogicalConvention.INSTANCE, 
"BeamJoinRule");
+    this.sqlEnv = sqlEnv;
   }
 
-  @Override public RelNode convert(RelNode rel) {
+  @Override
+  public RelNode convert(RelNode rel) {
     Join join = (Join) rel;
     return new BeamJoinRel(
+        sqlEnv,
         join.getCluster(),
         join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(join.getLeft(),
-            
join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        convert(join.getRight(),
-            
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        convert(
+            join.getLeft(), 
join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        convert(
+            join.getRight(), 
join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
         join.getCondition(),
         join.getVariablesSet(),
-        join.getJoinType()
-    );
+        join.getJoinType());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java
index d19a01d..ef52f3f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamProjectRule.java
@@ -1,18 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
  * limitations under the License.
  */
 package org.apache.beam.sdk.extensions.sql.impl.rule;
@@ -25,11 +22,7 @@ import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.logical.LogicalProject;
 
-/**
- * A {@code ConverterRule} to replace {@link Project} with
- * {@link BeamProjectRel}.
- *
- */
+/** A {@code ConverterRule} to replace {@link Project} with {@link 
BeamProjectRel}. */
 public class BeamProjectRule extends ConverterRule {
   public static final BeamProjectRule INSTANCE = new BeamProjectRule();
 
@@ -42,9 +35,11 @@ public class BeamProjectRule extends ConverterRule {
     final Project project = (Project) rel;
     final RelNode input = project.getInput();
 
-    return new BeamProjectRel(project.getCluster(),
+    return new BeamProjectRel(
+        project.getCluster(),
         project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
         convert(input, 
input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        project.getProjects(), project.getRowType());
+        project.getProjects(),
+        project.getRowType());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index adbed07..414eabb 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -19,10 +19,8 @@
 package org.apache.beam.sdk.extensions.sql;
 
 import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
-import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest
-    .ORDER_DETAILS1;
-import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest
-    .ORDER_DETAILS2;
+import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.Matchers.isA;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index b6ac343..2b53432 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
@@ -45,9 +46,7 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.RelBuilder;
 import org.junit.BeforeClass;
 
-/**
- * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link 
BeamSqlExpression}.
- */
+/** base class to test {@link BeamSqlFnExecutor} and subclasses of {@link 
BeamSqlExpression}. */
 public class BeamSqlFnExecutorTestBase {
   static final JavaTypeFactory TYPE_FACTORY = new 
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
   static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
@@ -59,30 +58,35 @@ public class BeamSqlFnExecutorTestBase {
 
   @BeforeClass
   public static void prepare() {
-    relDataType = TYPE_FACTORY.builder()
-        .add("order_id", SqlTypeName.BIGINT)
-        .add("site_id", SqlTypeName.INTEGER)
-        .add("price", SqlTypeName.DOUBLE)
-        .add("order_time", SqlTypeName.BIGINT).build();
+    relDataType =
+        TYPE_FACTORY
+            .builder()
+            .add("order_id", SqlTypeName.BIGINT)
+            .add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE)
+            .add("order_time", SqlTypeName.BIGINT)
+            .build();
 
     row =
-        Row
-            .withRowType(CalciteUtils.toBeamRowType(relDataType))
-            .addValues(
-                1234567L,
-                0,
-                8.9,
-                1234567L)
+        Row.withRowType(CalciteUtils.toBeamRowType(relDataType))
+            .addValues(1234567L, 0, 8.9, 1234567L)
             .build();
 
+    BeamSqlEnv sqlEnv = new BeamSqlEnv();
     SchemaPlus schema = Frameworks.createRootSchema(true);
     final List<RelTraitDef> traitDefs = new ArrayList<>();
     traitDefs.add(ConventionTraitDef.INSTANCE);
     traitDefs.add(RelCollationTraitDef.INSTANCE);
-    FrameworkConfig config = Frameworks.newConfigBuilder()
-        
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
-        
.traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
-        
.costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+    FrameworkConfig config =
+        Frameworks.newConfigBuilder()
+            .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build())
+            .defaultSchema(schema)
+            .traitDefs(traitDefs)
+            .context(Contexts.EMPTY_CONTEXT)
+            .ruleSets(BeamRuleSets.getRuleSets(sqlEnv))
+            .costFactory(null)
+            .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+            .build();
 
     relBuilder = RelBuilder.create(config);
   }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceAccumulatorTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceAccumulatorTest.java
index 73cff75..d4d0bc2 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceAccumulatorTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceAccumulatorTest.java
@@ -20,10 +20,8 @@ package 
org.apache.beam.sdk.extensions.sql.impl.transform.agg;
 
 import static java.math.BigDecimal.ONE;
 import static java.math.BigDecimal.ZERO;
-import static 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator
-    .newVarianceAccumulator;
-import static 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator
-    .ofSingleElement;
+import static 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator.newVarianceAccumulator;
+import static 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator.ofSingleElement;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java
index 6bb9aff..a3a73c9 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java
@@ -19,8 +19,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.transform.agg;
 
 import static java.math.BigDecimal.ZERO;
-import static 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator
-    .newVarianceAccumulator;
+import static 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceAccumulator.newVarianceAccumulator;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 

-- 
To stop receiving notification emails like this one, please contact
ming...@apache.org.

Reply via email to