[ 
https://issues.apache.org/jira/browse/BEAM-4044?focusedWorklogId=99477&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99477
 ]

ASF GitHub Bot logged work on BEAM-4044:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/May/18 12:00
            Start Date: 08/May/18 12:00
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5224: [BEAM-4044] 
[SQL] Add tables via TableStore in Schema, execute DDL in Calcite model
URL: https://github.com/apache/beam/pull/5224
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 8cf689084d7..ce592cfbabb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -17,21 +17,16 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
-import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.parser.BeamSqlParser;
-import org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateTable;
-import org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropTable;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.sql.SqlExecutableStatement;
 import org.apache.calcite.sql.SqlNode;
 
 /**
@@ -47,13 +42,7 @@
 
   public BeamSqlCli metaStore(MetaStore metaStore) {
     this.metaStore = metaStore;
-    this.env = new BeamSqlEnv();
-
-    // dump tables in metaStore into schema
-    List<Table> tables = this.metaStore.listTables();
-    for (Table table : tables) {
-      env.registerTable(table.getName(), 
metaStore.buildBeamSqlTable(table.getName()));
-    }
+    this.env = new BeamSqlEnv(metaStore);
 
     return this;
   }
@@ -78,45 +67,16 @@ public void execute(String sqlString) throws Exception {
     BeamSqlParser parser = new BeamSqlParser(sqlString);
     SqlNode sqlNode = parser.impl().parseSqlStmtEof();
 
-    if (sqlNode instanceof SqlCreateTable) {
-      handleCreateTable((SqlCreateTable) sqlNode, metaStore);
-    } else if (sqlNode instanceof SqlDropTable) {
-      handleDropTable((SqlDropTable) sqlNode);
+    // DDL nodes are SqlExecutableStatement
+    if (sqlNode instanceof SqlExecutableStatement) {
+      ((SqlExecutableStatement) sqlNode).execute(env.getContext());
     } else {
       PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] 
{}).withValidation()
           .as(PipelineOptions.class);
       options.setJobName("BeamPlanCreator");
       Pipeline pipeline = Pipeline.create(options);
-      compilePipeline(sqlString, pipeline, env);
+      env.getPlanner().compileBeamPipeline(sqlString, pipeline);
       pipeline.run();
     }
   }
-
-  private void handleCreateTable(SqlCreateTable stmt, MetaStore store) {
-    Table table = stmt.toTable();
-    if (table.getType() == null) {
-      throw new IllegalStateException("Table type is not specified and 
BeamSqlCli#defaultTableType"
-          + "is not configured!");
-    }
-
-    store.createTable(table);
-
-    // register the new table into the schema
-    env.registerTable(table.getName(), 
metaStore.buildBeamSqlTable(table.getName()));
-  }
-
-  private void handleDropTable(SqlDropTable stmt) {
-    metaStore.dropTable(stmt.getNameSimple());
-    env.deregisterTable(stmt.getNameSimple());
-  }
-
-  /**
-   * compile SQL, and return a {@link Pipeline}.
-   */
-  private static PCollection<Row> compilePipeline(String sqlStatement, 
Pipeline basePipeline,
-                                                  BeamSqlEnv sqlEnv) throws 
Exception {
-    PCollection<Row> resultStream =
-        sqlEnv.getPlanner().compileBeamPipeline(sqlStatement, basePipeline, 
sqlEnv);
-    return resultStream;
-  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
index 274c14a26e7..45b649e466e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
@@ -24,11 +24,13 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.BeamSqlTableProvider;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -55,20 +57,17 @@
 
   @Override
   public PCollection<Row> expand(PInput input) {
-    PCollectionTuple inputTuple = toPCollectionTuple(input);
-
-    BeamSqlEnv sqlEnv = new BeamSqlEnv();
+    BeamSqlEnv sqlEnv = new BeamSqlEnv(toTableProvider(input));
 
     if (input instanceof PCollection) {
       validateQuery(sqlEnv, queryString());
     }
 
-    sqlEnv.registerPCollectionTuple(inputTuple);
     registerFunctions(sqlEnv);
 
     try {
       return
-          inputTuple.apply(
+          PCollectionTuple.empty(input.getPipeline()).apply(
           sqlEnv
               .getPlanner()
               .convertToBeamRel(queryString())
@@ -78,25 +77,28 @@
     }
   }
 
-  private PCollectionTuple toPCollectionTuple(PInput inputs) {
-    return (inputs instanceof PCollection)
-        ? PCollectionTuple.of(new TupleTag<>(PCOLLECTION_NAME), toRows(inputs))
-        : tupleOfAllInputs(inputs.getPipeline(), inputs.expand());
+  private BeamSqlTableProvider toTableProvider(PInput inputs) {
+    return new BeamSqlTableProvider(PCOLLECTION_NAME, toTableMap(inputs));
   }
 
-  private PCollectionTuple tupleOfAllInputs(
-      Pipeline pipeline,
-      Map<TupleTag<?>, PValue> taggedInputs) {
-
-    PCollectionTuple tuple = PCollectionTuple.empty(pipeline);
-
-    for (Map.Entry<TupleTag<?>, PValue> input : taggedInputs.entrySet()) {
-      tuple = tuple.and(
-          new TupleTag<>(input.getKey().getId()),
-          toRows(input.getValue()));
+  private Map<String, BeamSqlTable> toTableMap(PInput inputs) {
+    /**
+     * A single PCollection is transformed to a table named PCOLLECTION, other
+     * input types are expanded and converted to tables using the tags as 
names.
+     */
+    if (inputs instanceof PCollection) {
+      return
+          ImmutableMap.of(
+              PCOLLECTION_NAME,
+              new BeamPCollectionTable(toRows(inputs)));
     }
 
-    return tuple;
+    ImmutableMap.Builder<String, BeamSqlTable> tables = ImmutableMap.builder();
+    for (Map.Entry<TupleTag<?>, PValue> input : inputs.expand().entrySet()) {
+      tables.put(input.getKey().getId(),
+          new BeamPCollectionTable(toRows(input.getValue())));
+    }
+    return tables.build();
   }
 
   private void registerFunctions(BeamSqlEnv sqlEnv) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
new file mode 100644
index 00000000000..272f0f79166
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+
+/**
+ * Adapter from {@link TableProvider} to {@link Schema}.
+ */
+public class BeamCalciteSchema implements Schema {
+  private TableProvider tableProvider;
+
+  public BeamCalciteSchema(TableProvider tableProvider) {
+    this.tableProvider = tableProvider;
+  }
+
+  public TableProvider getTableProvider() {
+    return tableProvider;
+  }
+
+  @Override
+  public boolean isMutable() {
+    return true;
+  }
+
+  @Override
+  public Schema snapshot(SchemaVersion version) {
+    return this;
+  }
+
+  @Override
+  public Expression getExpression(SchemaPlus parentSchema, String name) {
+    return Schemas.subSchemaExpression(parentSchema, name, getClass());
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return tableProvider.getTables().keySet();
+  }
+
+  @Override
+  public org.apache.calcite.schema.Table getTable(String name) {
+    Table table = tableProvider.getTables().get(name);
+    if (table == null) {
+      return null;
+    }
+    return new BeamCalciteTable(tableProvider.buildBeamSqlTable(table));
+  }
+
+  @Override
+  public Set<String> getFunctionNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Collection<Function> getFunctions(String name) {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<String> getSubSchemaNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Schema getSubSchema(String name) {
+    return null;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
new file mode 100644
index 00000000000..e925eeb8e62
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ModifiableTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+
+/**
+ * Adapter from {@link BeamSqlTable} to a calcite Table.
+ */
+class BeamCalciteTable extends AbstractQueryableTable
+    implements ModifiableTable, TranslatableTable {
+  private final BeamSqlTable beamTable;
+  private final RelDataType rowType;
+
+  public BeamCalciteTable(BeamSqlTable beamTable) {
+    super(Object[].class);
+    this.beamTable = beamTable;
+    this.rowType = CalciteUtils.toCalciteRowType(this.beamTable.getSchema(),
+        BeamQueryPlanner.TYPE_FACTORY);
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return rowType;
+  }
+
+  @Override
+  public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable 
relOptTable) {
+    return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable);
+  }
+
+  @Override
+  public <T> Queryable<T> asQueryable(
+      QueryProvider queryProvider, SchemaPlus schema, String tableName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection getModifiableCollection() {
+    return null;
+  }
+
+  @Override
+  public TableModify toModificationRel(
+      RelOptCluster cluster,
+      RelOptTable table,
+      Prepare.CatalogReader catalogReader,
+      RelNode child,
+      TableModify.Operation operation,
+      List<String> updateColumnList,
+      List<RexNode> sourceExpressionList,
+      boolean flattened) {
+    return new BeamIOSinkRel(
+        cluster,
+        table,
+        catalogReader,
+        child,
+        operation,
+        updateColumnList,
+        sourceExpressionList,
+        flattened,
+        beamTable);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index eeaf36e7af3..bbc3165175f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,46 +17,23 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-import java.util.Collection;
 import java.util.List;
-import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
 import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
-import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.ModifiableTable;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.RelRunner;
 
 /**
  * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and 
{@link BeamSqlCli}.
@@ -64,13 +41,15 @@
  * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF 
functions, and a
  * {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL 
queries.
  */
-public class BeamSqlEnv implements Serializable {
-  transient CalciteSchema schema;
-  transient BeamQueryPlanner planner;
-
-  public BeamSqlEnv() {
-    schema = CalciteSchema.createRootSchema(true);
-    planner = new BeamQueryPlanner(this, schema.plus());
+public class BeamSqlEnv {
+  final CalciteSchema schema;
+  final CalciteSchema defaultSchema;
+  final BeamQueryPlanner planner;
+
+  public BeamSqlEnv(TableProvider tableProvider) {
+    schema = CalciteSchema.createRootSchema(true, false);
+    defaultSchema = schema.add("beam", new BeamCalciteSchema(tableProvider));
+    planner = new BeamQueryPlanner(defaultSchema.plus());
   }
 
   /**
@@ -103,105 +82,58 @@ public void registerUdaf(String functionName, 
Combine.CombineFn combineFn) {
     schema.plus().add(functionName, new UdafImpl(combineFn));
   }
 
-  /**
-   * Registers {@link PCollection}s in {@link PCollectionTuple} as a tables.
-   *
-   * <p>Assumes that {@link PCollection} elements are {@link Row}s.
-   *
-   * <p>{@link TupleTag#getId()}s are used as table names.
-   */
-  public void registerPCollectionTuple(PCollectionTuple pCollectionTuple) {
-    pCollectionTuple
-        .getAll()
-        .forEach((tag, pCollection) ->
-                registerPCollection(tag.getId(), (PCollection<Row>) 
pCollection));
-  }
-
-  /**
-   * Registers {@link PCollection} of {@link Row}s as a table.
-   *
-   * <p>Assumes that {@link PCollection#getCoder()} returns an instance of 
{@link RowCoder}.
-   */
-  public void registerPCollection(String name, PCollection<Row> pCollection) {
-    registerTable(name, pCollection, ((RowCoder) 
pCollection.getCoder()).getSchema());
-  }
-
-  /**
-   * Registers {@link PCollection} as a table.
-   */
-  public void registerTable(String tableName, PCollection<Row> pCollection, 
Schema schema) {
-    registerTable(tableName, new BeamPCollectionTable(pCollection, schema));
-  }
-
-  /**
-   * Registers a {@link BaseBeamTable} which can be used for all subsequent 
queries.
-   */
-  public void registerTable(String tableName, BeamSqlTable table) {
-    schema.add(tableName, new BeamCalciteTable(table));
+  public BeamQueryPlanner getPlanner() {
+    return planner;
   }
 
-  public void deregisterTable(String targetTableName) {
-    schema.removeTable(targetTableName);
+  public CalcitePrepare.Context getContext() {
+    return new ContextImpl();
   }
 
-  private static class BeamCalciteTable extends AbstractQueryableTable
-      implements ModifiableTable, TranslatableTable {
-    private BeamSqlTable beamTable;
-
-    public BeamCalciteTable(BeamSqlTable beamTable) {
-      super(Object[].class);
-      this.beamTable = beamTable;
+  private class ContextImpl implements CalcitePrepare.Context {
+    @Override
+    public JavaTypeFactory getTypeFactory() {
+      return planner.TYPE_FACTORY;
     }
 
     @Override
-    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-      return CalciteUtils.toCalciteRowType(this.beamTable.getSchema(),
-          BeamQueryPlanner.TYPE_FACTORY);
+    public CalciteSchema getRootSchema() {
+      return schema;
     }
 
     @Override
-    public RelNode toRel(
-      RelOptTable.ToRelContext context,
-      RelOptTable relOptTable) {
-      return new BeamIOSourceRel(
-          context.getCluster(), relOptTable, beamTable);
+    public CalciteSchema getMutableRootSchema() {
+      return getRootSchema();
     }
 
     @Override
-    public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
-        SchemaPlus schema, String tableName) {
-      throw new UnsupportedOperationException();
+    public List<String> getDefaultSchemaPath() {
+      return defaultSchema.path(null);
     }
 
     @Override
-    public Collection getModifiableCollection() {
+    public List<String> getObjectPath() {
       return null;
     }
 
     @Override
-    public TableModify toModificationRel(
-        RelOptCluster cluster,
-        RelOptTable table,
-        Prepare.CatalogReader catalogReader,
-        RelNode child,
-        TableModify.Operation operation,
-        List<String> updateColumnList,
-        List<RexNode> sourceExpressionList,
-        boolean flattened) {
-      return new BeamIOSinkRel(
-          cluster, table, catalogReader, child, operation, updateColumnList,
-          sourceExpressionList, flattened, beamTable);
+    public CalciteConnectionConfig config() {
+      throw new UnsupportedOperationException();
     }
-  }
 
-  public BeamQueryPlanner getPlanner() {
-    return planner;
-  }
+    @Override
+    public DataContext getDataContext() {
+      throw new UnsupportedOperationException();
+    }
 
-  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-    in.defaultReadObject();
+    @Override
+    public RelRunner getRelRunner() {
+      throw new UnsupportedOperationException();
+    }
 
-    schema = CalciteSchema.createRootSchema(true);
-    planner = new BeamQueryPlanner(this, schema.plus());
+    @Override
+    public CalcitePrepare.SparkHandler spark() {
+      throw new UnsupportedOperationException();
+    }
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
index 3a1cfed02f9..c8984e1f684 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
@@ -18,16 +18,21 @@
 
 import static com.alibaba.fastjson.JSON.parseObject;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.calcite.util.Static.RESOURCE;
 
 import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.meta.Column;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlExecutableStatement;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
@@ -35,15 +40,18 @@
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
 import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Pair;
 
 /**
  * Parse tree for {@code CREATE TABLE} statement.
  */
-public class SqlCreateTable extends SqlCreate {
+public class SqlCreateTable extends SqlCreate
+    implements SqlExecutableStatement {
   private final SqlIdentifier name;
   private final SqlNodeList columnList;
   private final SqlNode type;
@@ -102,6 +110,28 @@
     }
   }
 
+  @Override
+  public void execute(CalcitePrepare.Context context) {
+    final Pair<CalciteSchema, String> pair =
+        SqlDdlNodes.schema(context, true, name);
+    if (pair.left.plus().getTable(pair.right) != null) {
+      // Table exists.
+      if (!ifNotExists) {
+        // They did not specify IF NOT EXISTS, so give error.
+        throw SqlUtil.newContextException(name.getParserPosition(),
+            RESOURCE.tableExists(pair.right));
+      }
+      return;
+    }
+    // Table does not exist. Create it.
+    if (!(pair.left.schema instanceof BeamCalciteSchema)) {
+      throw SqlUtil.newContextException(name.getParserPosition(),
+          RESOURCE.internal("Schema is not instanceof BeamCalciteSchema"));
+    }
+    BeamCalciteSchema schema = (BeamCalciteSchema) pair.left.schema;
+    schema.getTableProvider().createTable(toTable());
+  }
+
   private String getString(SqlNode n) {
     return n == null ? null : ((NlsString) SqlLiteral.value(n)).getValue();
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
index 4e79abff8d1..9a779b8b4d6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
@@ -16,11 +16,16 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.parser;
 
+import java.util.List;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
 
 /**
  * Utilities concerning {@link SqlNode} for DDL.
@@ -47,6 +52,26 @@ public static SqlNode column(SqlParserPos pos, SqlIdentifier 
name,
       SqlDataTypeSpec dataType, SqlNode comment) {
     return new SqlColumnDeclaration(pos, name, dataType, comment);
   }
+
+  /** Returns the schema in which to create an object. */
+  static Pair<CalciteSchema, String> schema(CalcitePrepare.Context context,
+      boolean mutable, SqlIdentifier id) {
+    final String name;
+    final List<String> path;
+    if (id.isSimple()) {
+      path = context.getDefaultSchemaPath();
+      name = id.getSimple();
+    } else {
+      path = Util.skipLast(id.names);
+      name = Util.last(id.names);
+    }
+    CalciteSchema schema = mutable ? context.getMutableRootSchema()
+        : context.getRootSchema();
+    for (String p : path) {
+      schema = schema.getSubSchema(p, true);
+    }
+    return Pair.of(schema, name);
+  }
 }
 
 // End SqlDdlNodes.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
index 8cc0aeaa589..857da159a7e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
@@ -16,12 +16,19 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.parser;
 
+import static org.apache.calcite.util.Static.RESOURCE;
+
 import com.google.common.collect.ImmutableList;
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.sql.SqlDrop;
+import org.apache.calcite.sql.SqlExecutableStatement;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
@@ -29,7 +36,8 @@
  * Base class for parse trees of {@code DROP TABLE}, {@code DROP VIEW} and
  * {@code DROP MATERIALIZED VIEW} statements.
  */
-abstract class SqlDropObject extends SqlDrop {
+abstract class SqlDropObject extends SqlDrop
+    implements SqlExecutableStatement {
   protected final SqlIdentifier name;
 
   /** Creates a SqlDropObject. */
@@ -51,8 +59,30 @@
     name.unparse(writer, leftPrec, rightPrec);
   }
 
-  public String getNameSimple() {
-    return name.getSimple().toLowerCase();
+  public void execute(CalcitePrepare.Context context) {
+    final List<String> path = context.getDefaultSchemaPath();
+    CalciteSchema schema = context.getRootSchema();
+    for (String p : path) {
+      schema = schema.getSubSchema(p, true);
+    }
+    final boolean existed;
+    switch (getKind()) {
+    case DROP_TABLE:
+      if (schema.schema instanceof BeamCalciteSchema) {
+        BeamCalciteSchema beamSchema = (BeamCalciteSchema) schema.schema;
+        
beamSchema.getTableProvider().dropTable(name.getSimple().toLowerCase());
+        existed = true;
+      } else {
+        existed = schema.removeTable(name.getSimple());
+      }
+      if (!existed && !ifExists) {
+        throw SqlUtil.newContextException(name.getParserPosition(),
+            RESOURCE.tableNotFound(name.getSimple()));
+      }
+      break;
+    default:
+      throw new AssertionError(getKind());
+    }
   }
 }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index 36603f1e71b..54d96afdf1c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -21,7 +21,6 @@
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -70,7 +69,7 @@
   public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
       RelDataTypeSystem.DEFAULT);
 
-  public BeamQueryPlanner(BeamSqlEnv sqlEnv, SchemaPlus schema) {
+  public BeamQueryPlanner(SchemaPlus schema) {
     String defaultCharsetKey = "saffron.default.charset";
     if (System.getProperty(defaultCharsetKey) == null) {
       System.setProperty(defaultCharsetKey, 
ConversionUtil.NATIVE_UTF16_CHARSET_NAME);
@@ -88,7 +87,7 @@ public BeamQueryPlanner(BeamSqlEnv sqlEnv, SchemaPlus schema) 
{
     sqlOperatorTables.add(SqlStdOperatorTable.instance());
     sqlOperatorTables.add(
         new CalciteCatalogReader(
-            CalciteSchema.from(schema), Collections.emptyList(), TYPE_FACTORY, 
null));
+            CalciteSchema.from(schema).root(), Collections.emptyList(), 
TYPE_FACTORY, null));
 
     FrameworkConfig config =
         Frameworks.newConfigBuilder()
@@ -99,7 +98,7 @@ public BeamQueryPlanner(BeamSqlEnv sqlEnv, SchemaPlus schema) 
{
             .defaultSchema(schema)
             .traitDefs(traitDefs)
             .context(Contexts.EMPTY_CONTEXT)
-            .ruleSets(BeamRuleSets.getRuleSets(sqlEnv))
+            .ruleSets(BeamRuleSets.getRuleSets())
             .costFactory(null)
             .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
             .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
@@ -119,8 +118,8 @@ public SqlNode parseQuery(String sqlQuery) throws 
SqlParseException{
    * which is linked with the given {@code pipeline}. The final output stream 
is returned as
    * {@code PCollection} so more operations can be applied.
    */
-  public PCollection<Row> compileBeamPipeline(String sqlStatement, Pipeline 
basePipeline
-      , BeamSqlEnv sqlEnv) throws Exception {
+  public PCollection<Row> compileBeamPipeline(String sqlStatement, Pipeline 
basePipeline)
+      throws Exception {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 
     // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 8423ddaf153..c0d4ccdc0e0 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.planner;
 
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
 import 
org.apache.beam.sdk.extensions.sql.impl.rule.BeamEnumerableConverterRule;
@@ -39,7 +38,7 @@
  */
 public class BeamRuleSets {
 
-  public static RuleSet[] getRuleSets(BeamSqlEnv sqlEnv) {
+  public static RuleSet[] getRuleSets() {
     return new RuleSet[] {
       RuleSets.ofList(
           BeamProjectRule.INSTANCE,
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index 0a8db3997ba..66756d3efbd 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -17,14 +17,11 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import com.google.common.base.Joiner;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.core.TableScan;
@@ -52,18 +49,8 @@ public BeamIOSourceRel(
 
     @Override
     public PCollection<Row> expand(PCollectionTuple inputPCollections) {
-      String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
-      TupleTag<Row> sourceTupleTag = new TupleTag<>(sourceName);
-      if (inputPCollections.has(sourceTupleTag)) {
-        // choose PCollection from input PCollectionTuple if exists there.
-        PCollection<Row> sourceStream = inputPCollections.get(new 
TupleTag<Row>(sourceName));
-        return sourceStream;
-      }
-      // If not, the source PColection is provided with 
BaseBeamTable.buildIOReader().
       return sqlTable
-          .buildIOReader(inputPCollections.getPipeline())
-          .setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
+          .buildIOReader(inputPCollections.getPipeline());
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
index 0b10b4c6b19..89858ff5ab4 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.schema;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -26,20 +26,15 @@
 import org.apache.beam.sdk.values.Row;
 
 /**
- * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as 
a virtual table,
+ * {@code BeamPCollectionTable} converts a {@code PCollection<Row>} as a 
virtual table,
  * then a downstream query can query directly.
  */
 public class BeamPCollectionTable extends BaseBeamTable {
   private BeamIOType ioType;
   private transient PCollection<Row> upstream;
 
-  protected BeamPCollectionTable(Schema beamSchema) {
-    super(beamSchema);
-  }
-
-  public BeamPCollectionTable(PCollection<Row> upstream,
-      Schema beamSchema) {
-    this(beamSchema);
+  public BeamPCollectionTable(PCollection<Row> upstream) {
+    super(((RowCoder) upstream.getCoder()).getSchema());
     ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
         ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
     this.upstream = upstream;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/BeamSqlTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/BeamSqlTableProvider.java
new file mode 100644
index 00000000000..928069f9db3
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/BeamSqlTableProvider.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/**
+ * A {@code BeamSqlTableProvider} provides read only set of {@code 
BeamSqlTable}.
+ */
+public class BeamSqlTableProvider implements TableProvider {
+  private final String typeName;
+  private final Map<String, BeamSqlTable> tables;
+
+  public BeamSqlTableProvider(String typeName, Map<String, BeamSqlTable> 
tables) {
+    this.typeName = typeName;
+    this.tables = tables;
+  }
+
+  @Override public String getTableType() {
+    return typeName;
+  }
+
+  @Override
+  public void createTable(Table table) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, Table> getTables() {
+    ImmutableMap.Builder<String, Table> map = ImmutableMap.builder();
+    for (Map.Entry<String, BeamSqlTable> table : tables.entrySet()) {
+      map.put(table.getKey(),
+          Table.builder()
+            .type(getTableType())
+            .name(table.getKey())
+            .columns(Collections.emptyList())
+            .build());
+    }
+    return map.build();
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return tables.get(table.getName());
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/InMemoryMetaTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/InMemoryMetaTableProvider.java
new file mode 100644
index 00000000000..6a808f2ffe3
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/InMemoryMetaTableProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/**
+ * A {@code InMemoryMetaTableProvider} is an abstract {@code TableProvider} 
for in-memory types.
+ */
+public abstract class InMemoryMetaTableProvider implements TableProvider {
+
+  @Override
+  public void createTable(Table table) {
+    // No-op
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    // No-op
+  }
+
+  @Override
+  public Map<String, Table> getTables() {
+    return Collections.emptyMap();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
index 5bbadb13eeb..a1b63ddeea3 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.sdk.extensions.sql.meta.provider;
 
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 
@@ -29,11 +29,6 @@
  * handle MySQL based tables, a provider to handle Casandra based tables etc.
  */
 public interface TableProvider {
-  /**
-   * Init the provider.
-   */
-  void init();
-
   /**
    * Gets the table type this provider handles.
    */
@@ -52,17 +47,12 @@
   void dropTable(String tableName);
 
   /**
-   * List all tables from this provider.
+   * Get all tables from this provider.
    */
-  List<Table> listTables();
+  Map<String, Table> getTables();
 
   /**
    * Build a {@link BeamSqlTable} using the given table meta info.
    */
   BeamSqlTable buildBeamSqlTable(Table table);
-
-  /**
-   * Close the provider.
-   */
-  void close();
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index a068f73af43..4550d02744a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -23,11 +23,10 @@
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.schemas.Schema;
 
 /**
@@ -45,7 +44,7 @@
  * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", 
"topic2"]}'
  * }</pre>
  */
-public class KafkaTableProvider implements TableProvider {
+public class KafkaTableProvider extends InMemoryMetaTableProvider {
   @Override public BeamSqlTable buildBeamSqlTable(Table table) {
     Schema schema = getRowTypeFromTable(table);
 
@@ -63,24 +62,4 @@
   @Override public String getTableType() {
     return "kafka";
   }
-
-  @Override public void createTable(Table table) {
-    // empty
-  }
-
-  @Override public void dropTable(String tableName) {
-    // empty
-  }
-
-  @Override public List<Table> listTables() {
-    return Collections.emptyList();
-  }
-
-  @Override public void init() {
-    // empty
-  }
-
-  @Override public void close() {
-    // empty
-  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 69820fbcd10..55805a2b9a6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -21,11 +21,9 @@
 import static 
org.apache.beam.sdk.extensions.sql.meta.provider.MetaUtils.getRowTypeFromTable;
 
 import com.alibaba.fastjson.JSONObject;
-import java.util.Collections;
-import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.commons.csv.CSVFormat;
 
@@ -44,7 +42,7 @@
  * TBLPROPERTIES '{"format": "Excel"}' -- format of each text line(csv format)
  * }</pre>
  */
-public class TextTableProvider implements TableProvider {
+public class TextTableProvider extends InMemoryMetaTableProvider {
 
   @Override public String getTableType() {
     return "text";
@@ -64,24 +62,4 @@
     BeamTextCSVTable txtTable = new BeamTextCSVTable(schema, filePattern, 
format);
     return txtTable;
   }
-
-  @Override public void createTable(Table table) {
-    // empty
-  }
-
-  @Override public void dropTable(String tableName) {
-    // empty
-  }
-
-  @Override public List<Table> listTables() {
-    return Collections.emptyList();
-  }
-
-  @Override public void init() {
-    // empty
-  }
-
-  @Override public void close() {
-    // empty
-  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
index 53eeb7e4e59..7cb84bf1f9f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
@@ -18,9 +18,8 @@
 
 package org.apache.beam.sdk.extensions.sql.meta.store;
 
-import java.util.ArrayList;
+import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
@@ -37,7 +36,8 @@
   private Map<String, Table> tables = new HashMap<>();
   private Map<String, TableProvider> providers = new HashMap<>();
 
-  public InMemoryMetaStore() {
+  @Override public String getTableType() {
+    return "store";
   }
 
   @Override public void createTable(Table table) {
@@ -65,24 +65,11 @@ public InMemoryMetaStore() {
     tables.remove(tableName);
   }
 
-  @Override public Table getTable(String tableName) {
-    if (tableName == null) {
-      return null;
-    }
-    return tables.get(tableName.toLowerCase());
-  }
-
-  @Override public List<Table> listTables() {
-    return new ArrayList<>(tables.values());
+  @Override public Map<String, Table> getTables() {
+    return ImmutableMap.copyOf(tables);
   }
 
-  @Override public BeamSqlTable buildBeamSqlTable(String tableName) {
-    Table table = getTable(tableName);
-
-    if (table == null) {
-      throw new IllegalArgumentException("The specified table: " + tableName + 
" does not exists!");
-    }
-
+  @Override public BeamSqlTable buildBeamSqlTable(Table table) {
     TableProvider provider = providers.get(table.getType());
 
     return provider.buildBeamSqlTable(table);
@@ -95,26 +82,25 @@ private void validateTableType(Table table) {
     }
   }
 
-  public void registerProvider(TableProvider provider) {
+  @Override public void registerProvider(TableProvider provider) {
     if (providers.containsKey(provider.getTableType())) {
       throw new IllegalArgumentException("Provider is already registered for 
table type: "
           + provider.getTableType());
     }
 
-    this.providers.put(provider.getTableType(), provider);
     initTablesFromProvider(provider);
+    this.providers.put(provider.getTableType(), provider);
   }
 
   private void initTablesFromProvider(TableProvider provider) {
-    List<Table> tables = provider.listTables();
-    for (Table table : tables) {
-      if (this.tables.containsKey(table.getName())) {
+    Map<String, Table> tables = provider.getTables();
+    for (String tableName : tables.keySet()) {
+      if (this.tables.containsKey(tableName)) {
         throw new IllegalStateException(
-            "Duplicate table: " + table.getName() + " from provider: " + 
provider);
+            "Duplicate table: " + tableName + " from provider: " + provider);
       }
-
-      this.tables.put(table.getName(), table);
     }
+    this.tables.putAll(tables);
   }
 
   Map<String, TableProvider> getProviders() {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
index ac5b739aec6..44e27d5a0b8 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
@@ -18,40 +18,12 @@
 
 package org.apache.beam.sdk.extensions.sql.meta.store;
 
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 
 /**
  * The interface to handle CRUD of {@code BeamSql} table metadata.
  */
-public interface MetaStore {
-  /**
-   * create a table.
-   */
-  void createTable(Table table);
-
-  /**
-   * drop a table.
-   */
-  void dropTable(String tableName);
-
-  /**
-   * Get table with the specified name.
-   */
-  Table getTable(String tableName);
-
-  /**
-   * List all the tables.
-   */
-  List<Table> listTables();
-
-  /**
-   * Build the {@code BeamSqlTable} for the specified table.
-   */
-  BeamSqlTable buildBeamSqlTable(String tableName);
-
+public interface MetaStore extends TableProvider {
   /**
    * Register a table provider.
    * @param provider
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 93b29d9beef..ebbffddf3cb 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -46,7 +46,7 @@ public void testExecute_createTextTable() throws Exception {
         + "TYPE 'text' \n"
         + "COMMENT '' LOCATION '/home/admin/orders'"
     );
-    Table table = metaStore.getTable("person");
+    Table table = metaStore.getTables().get("person");
     assertNotNull(table);
   }
 
@@ -65,11 +65,11 @@ public void testExecute_dropTable() throws Exception {
             + "TYPE 'text' \n"
             + "COMMENT '' LOCATION '/home/admin/orders'"
     );
-    Table table = metaStore.getTable("person");
+    Table table = metaStore.getTables().get("person");
     assertNotNull(table);
 
     cli.execute("drop table person");
-    table = metaStore.getTable("person");
+    table = metaStore.getTables().get("person");
     assertNull(table);
   }
 
@@ -113,7 +113,7 @@ public void testExplainQuery() throws Exception {
     String plan = cli.explainQuery("select * from person");
     assertEquals(
         "BeamProjectRel(id=[$0], name=[$1], age=[$2])\n"
-        + "  BeamIOSourceRel(table=[[person]])\n",
+        + "  BeamIOSourceRel(table=[[beam, person]])\n",
         plan
     );
   }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index a9bd979fa30..50dc8e302a5 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -19,7 +19,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
@@ -72,7 +71,6 @@ public static void prepare() {
             .addValues(1234567L, 0, 8.9, 1234567L)
             .build();
 
-    BeamSqlEnv sqlEnv = new BeamSqlEnv();
     SchemaPlus schema = Frameworks.createRootSchema(true);
     final List<RelTraitDef> traitDefs = new ArrayList<>();
     traitDefs.add(ConventionTraitDef.INSTANCE);
@@ -83,7 +81,7 @@ public static void prepare() {
             .defaultSchema(schema)
             .traitDefs(traitDefs)
             .context(Contexts.EMPTY_CONTEXT)
-            .ruleSets(BeamRuleSets.getRuleSets(sqlEnv))
+            .ruleSets(BeamRuleSets.getRuleSets())
             .costFactory(null)
             .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
             .build();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
index 74181f7148f..5605da717bc 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
@@ -130,7 +130,7 @@ public void testParseDropTable() throws Exception {
     assertTrue(sqlNode instanceof SqlDropTable);
     SqlDropTable stmt = (SqlDropTable) sqlNode;
     assertNotNull(stmt);
-    assertEquals("person", stmt.getNameSimple());
+    assertEquals("PERSON", stmt.name.getSimple());
   }
 
   private Table parseTable(String sql) throws Exception {
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
index 6a09d9c0958..34be097c125 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BaseRelTest.java
@@ -18,17 +18,28 @@
 
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.meta.provider.BeamSqlTableProvider;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 
 /**
  * Base class for rel test.
  */
-public class BaseRelTest {
-  public PCollection<Row> compilePipeline (
-      String sql, Pipeline pipeline, BeamSqlEnv sqlEnv) throws Exception {
-    return sqlEnv.getPlanner().compileBeamPipeline(sql, pipeline, sqlEnv);
+abstract class BaseRelTest {
+  private static Map<String, BeamSqlTable> tables = new HashMap();
+  private static BeamSqlEnv env = new BeamSqlEnv(new 
BeamSqlTableProvider("test", tables));
+
+  protected static PCollection<Row> compilePipeline (
+      String sql, Pipeline pipeline) throws Exception {
+    return env.getPlanner().compileBeamPipeline(sql, pipeline);
+  }
+
+  protected static void registerTable(String tableName, BeamSqlTable table) {
+    tables.put(tableName, table);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index a22dd817c17..9a0cec7bd2f 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.testing.PAssert;
@@ -34,14 +33,13 @@
  * Test for {@code BeamIntersectRel}.
  */
 public class BeamIntersectRelTest extends BaseRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
 
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
   public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS1",
+    registerTable("ORDER_DETAILS1",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -54,7 +52,7 @@ public static void prepare() {
         )
     );
 
-    sqlEnv.registerTable("ORDER_DETAILS2",
+    registerTable("ORDER_DETAILS2",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -76,7 +74,7 @@ public void testIntersect() throws Exception {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT64, "order_id",
@@ -99,7 +97,7 @@ public void testIntersectAll() throws Exception {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).satisfies(new CheckSize(3));
 
     PAssert.that(rows).containsInAnyOrder(
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index ade0cfc1728..1af7a6e386a 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.testing.PAssert;
@@ -36,7 +35,6 @@
 public class BeamJoinRelBoundedVsBoundedTest extends BaseRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
 
   public static final MockedBoundedTable ORDER_DETAILS1 =
       MockedBoundedTable.of(
@@ -62,8 +60,8 @@
 
   @BeforeClass
   public static void prepare() {
-    BEAM_SQL_ENV.registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
-    BEAM_SQL_ENV.registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
+    registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
+    registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
   }
 
   @Test
@@ -76,7 +74,7 @@ public void testInnerJoin() throws Exception {
         + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT32, "order_id",
@@ -101,7 +99,7 @@ public void testLeftOuterJoin() throws Exception {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     pipeline.enableAbandonedNodeEnforcement(false);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
@@ -129,7 +127,7 @@ public void testRightOuterJoin() throws Exception {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT32, "order_id",
@@ -156,7 +154,7 @@ public void testFullOuterJoin() throws Exception {
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT32, "order_id",
@@ -186,7 +184,7 @@ public void testException_nonEqualJoin() throws Exception {
         ;
 
     pipeline.enableAbandonedNodeEnforcement(false);
-    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    compilePipeline(sql, pipeline);
     pipeline.run();
   }
 
@@ -197,7 +195,7 @@ public void testException_crossJoin() throws Exception {
             + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
 
     pipeline.enableAbandonedNodeEnforcement(false);
-    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    compilePipeline(sql, pipeline);
     pipeline.run();
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index 47d3b06fb63..ca7a0e4fbe2 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -23,7 +23,6 @@
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
@@ -50,7 +49,6 @@
 public class BeamJoinRelUnboundedVsBoundedTest extends BaseRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
   public static final DateTime FIRST_DATE = new DateTime(1);
   public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
   public static final DateTime THIRD_DATE = new DateTime(1 + 3600 * 1000 + 
3600 * 1000 + 1);
@@ -58,7 +56,7 @@
 
   @BeforeClass
   public static void prepare() {
-    BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable
+    registerTable("ORDER_DETAILS", MockedUnboundedTable
         .of(
             TypeName.INT32, "order_id",
             TypeName.INT32, "site_id",
@@ -86,7 +84,7 @@ public static void prepare() {
         )
     );
 
-    BEAM_SQL_ENV.registerTable("ORDER_DETAILS1", MockedBoundedTable
+    registerTable("ORDER_DETAILS1", MockedBoundedTable
         .of(TypeName.INT32, "order_id",
             TypeName.STRING, "buyer"
         ).addRows(
@@ -94,7 +92,7 @@ public static void prepare() {
             2, "bond"
         ));
 
-    BEAM_SQL_ENV.registerTable(
+    registerTable(
         "SITE_LKP",
         new SiteLookupTable(
             TestUtils.buildBeamSqlRowType(
@@ -144,7 +142,7 @@ public void testInnerJoin_unboundedTableOnTheLeftSide() 
throws Exception {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -170,7 +168,7 @@ public void testInnerJoin_boundedTableOnTheLeftSide() 
throws Exception {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -196,7 +194,7 @@ public void testLeftOuterJoin() throws Exception {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
@@ -224,7 +222,7 @@ public void testLeftOuterJoinError() throws Exception {
         + " o1.order_id=o2.order_id"
         ;
     pipeline.enableAbandonedNodeEnforcement(false);
-    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    compilePipeline(sql, pipeline);
     pipeline.run();
   }
 
@@ -238,7 +236,7 @@ public void testRightOuterJoin() throws Exception {
         + " on "
         + " o1.order_id=o2.order_id"
         ;
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -266,7 +264,7 @@ public void testRightOuterJoinError() throws Exception {
         ;
 
     pipeline.enableAbandonedNodeEnforcement(false);
-    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    compilePipeline(sql, pipeline);
     pipeline.run();
   }
 
@@ -281,7 +279,7 @@ public void testFullOuterJoinError() throws Exception {
         + " o1.order_id=o2.order_id"
         ;
     pipeline.enableAbandonedNodeEnforcement(false);
-    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    compilePipeline(sql, pipeline);
     pipeline.run();
   }
 
@@ -294,7 +292,7 @@ public void testJoinAsLookup() throws Exception {
         + " o1.site_id=o2.site_id "
         + " WHERE o1.site_id=1"
         ;
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index 1d911ea6a37..5eaefeaffc7 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
@@ -40,7 +39,6 @@
 public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
   public static final DateTime FIRST_DATE = new DateTime(1);
   public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
 
@@ -48,7 +46,7 @@
 
   @BeforeClass
   public static void prepare() {
-    BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable
+    registerTable("ORDER_DETAILS", MockedUnboundedTable
         .of(TypeName.INT32, "order_id",
             TypeName.INT32, "site_id",
             TypeName.INT32, "price",
@@ -87,7 +85,7 @@ public void testInnerJoin() throws Exception {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -120,7 +118,7 @@ public void testLeftOuterJoin() throws Exception {
     // 2, 2 | 2, 5
     // 3, 3 | NULL, NULL
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -150,7 +148,7 @@ public void testRightOuterJoin() throws Exception {
         + " o1.order_id=o2.order_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
@@ -180,7 +178,7 @@ public void testFullOuterJoin() throws Exception {
         + " o1.order_id1=o2.order_id"
         ;
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
@@ -212,7 +210,7 @@ public void testWindowsMismatch() throws Exception {
         + " o1.order_id=o2.order_id"
         ;
     pipeline.enableAbandonedNodeEnforcement(false);
-    compilePipeline(sql, pipeline, BEAM_SQL_ENV);
+    compilePipeline(sql, pipeline);
     pipeline.run();
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index 5b8b3f67171..c080a5be493 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.testing.PAssert;
@@ -34,14 +33,12 @@
  * Test for {@code BeamMinusRel}.
  */
 public class BeamMinusRelTest extends BaseRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
   public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS1",
+    registerTable("ORDER_DETAILS1",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -55,7 +52,7 @@ public static void prepare() {
         )
     );
 
-    sqlEnv.registerTable("ORDER_DETAILS2",
+    registerTable("ORDER_DETAILS2",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -77,7 +74,7 @@ public void testExcept() throws Exception {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT64, "order_id",
@@ -99,7 +96,7 @@ public void testExceptAll() throws Exception {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).satisfies(new CheckSize(2));
 
     PAssert.that(rows).containsInAnyOrder(
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
index b18353de540..b6f1b5d4e54 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
@@ -20,7 +20,6 @@
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
@@ -38,15 +37,13 @@
  * Test for {@code BeamSetOperatorRelBase}.
  */
 public class BeamSetOperatorRelBaseTest extends BaseRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
   public static final DateTime THE_DATE = new DateTime(100000);
 
   @BeforeClass
   public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS",
+    registerTable("ORDER_DETAILS",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -70,7 +67,7 @@ public void testSameWindow() throws Exception {
         + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     // compare valueInString to ignore the windowStart & windowEnd
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
@@ -99,7 +96,7 @@ public void testDifferentWindows() throws Exception {
     // use a real pipeline rather than the TestPipeline because we are
     // testing exceptions, the pipeline will not actually run.
     Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
-    compilePipeline(sql, pipeline1, sqlEnv);
+    compilePipeline(sql, pipeline1);
     pipeline.run();
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index 4f96acdb691..a38891eea1e 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.testing.PAssert;
@@ -36,14 +35,12 @@
  * Test for {@code BeamSortRel}.
  */
 public class BeamSortRelTest extends BaseRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
   @Before
   public void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS",
+    registerTable("ORDER_DETAILS",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -62,7 +59,7 @@ public void prepare() {
             10L, 100, 10.0, new DateTime(9)
         )
     );
-    sqlEnv.registerTable("SUB_ORDER_RAM",
+    registerTable("SUB_ORDER_RAM",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -78,7 +75,7 @@ public void testOrderBy_basic() throws Exception {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
         TypeName.INT64, "order_id",
         TypeName.INT32, "site_id",
@@ -98,7 +95,7 @@ public void testOrderBy_timestamp() throws Exception {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_time desc limit 4";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(
         TypeName.INT64, "order_id",
         TypeName.INT32, "site_id",
@@ -115,7 +112,7 @@ public void testOrderBy_timestamp() throws Exception {
 
   @Test
   public void testOrderBy_nullsFirst() throws Exception {
-    sqlEnv.registerTable("ORDER_DETAILS",
+    registerTable("ORDER_DETAILS",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -128,7 +125,7 @@ public void testOrderBy_nullsFirst() throws Exception {
             5L, 5, 5.0
         )
     );
-    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+    registerTable("SUB_ORDER_RAM", MockedBoundedTable
         .of(TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
             TypeName.DOUBLE, "price"));
@@ -138,7 +135,7 @@ public void testOrderBy_nullsFirst() throws Exception {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT64, "order_id",
@@ -156,7 +153,7 @@ public void testOrderBy_nullsFirst() throws Exception {
 
   @Test
   public void testOrderBy_nullsLast() throws Exception {
-    sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable
+    registerTable("ORDER_DETAILS", MockedBoundedTable
         .of(TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
             TypeName.DOUBLE, "price"
@@ -166,7 +163,7 @@ public void testOrderBy_nullsLast() throws Exception {
             2L, 1, 3.0,
             2L, null, 4.0,
             5L, 5, 5.0));
-    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable
+    registerTable("SUB_ORDER_RAM", MockedBoundedTable
         .of(TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
             TypeName.DOUBLE, "price"));
@@ -176,7 +173,7 @@ public void testOrderBy_nullsLast() throws Exception {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT64, "order_id",
@@ -199,7 +196,7 @@ public void testOrderBy_with_offset() throws Exception {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT64, "order_id",
@@ -222,7 +219,7 @@ public void testOrderBy_bigFetch() throws Exception {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 11";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT64, "order_id",
@@ -253,6 +250,6 @@ public void testOrderBy_exception() throws Exception {
         + "ORDER BY order_id asc limit 11";
 
     TestPipeline pipeline = TestPipeline.create();
-    compilePipeline(sql, pipeline, sqlEnv);
+    compilePipeline(sql, pipeline);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
index 3e191449362..e9e85a1a9cb 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.testing.PAssert;
@@ -34,14 +33,12 @@
  * Test for {@code BeamUnionRel}.
  */
 public class BeamUnionRelTest extends BaseRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
   public static void prepare() {
-    sqlEnv.registerTable("ORDER_DETAILS",
+    registerTable("ORDER_DETAILS",
         MockedBoundedTable.of(
             TypeName.INT64, "order_id",
             TypeName.INT32, "site_id",
@@ -62,7 +59,7 @@ public void testUnion() throws Exception {
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS ";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT64, "order_id",
@@ -85,7 +82,7 @@ public void testUnionAll() throws Exception {
         + " SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS";
 
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT64, "order_id",
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
index ae48fc8c07b..8328315665d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.testing.PAssert;
@@ -34,20 +33,18 @@
  * Test for {@code BeamValuesRel}.
  */
 public class BeamValuesRelTest extends BaseRelTest {
-  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
-
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
   public static void prepare() {
-    sqlEnv.registerTable("string_table",
+    registerTable("string_table",
         MockedBoundedTable.of(
             TypeName.STRING, "name",
             TypeName.STRING, "description"
         )
     );
-    sqlEnv.registerTable("int_table",
+    registerTable("int_table",
         MockedBoundedTable.of(
             TypeName.INT32, "c0",
             TypeName.INT32, "c1"
@@ -59,7 +56,7 @@ public static void prepare() {
   public void testValues() throws Exception {
     String sql = "insert into string_table(name, description) values "
         + "('hello', 'world'), ('james', 'bond')";
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.STRING, "name",
@@ -75,7 +72,7 @@ public void testValues() throws Exception {
   @Test
   public void testValues_castInt() throws Exception {
     String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 
as int))";
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT32, "c0",
@@ -90,7 +87,7 @@ public void testValues_castInt() throws Exception {
   @Test
   public void testValues_onlySelect() throws Exception {
     String sql = "select 1, '1'";
-    PCollection<Row> rows = compilePipeline(sql, pipeline, sqlEnv);
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(
         TestUtils.RowsBuilder.of(
             TypeName.INT32, "EXPR$0",
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index 4ae03c6acb3..5a42b31b8d1 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -20,13 +20,12 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
 import org.apache.beam.sdk.extensions.sql.meta.Column;
@@ -54,7 +53,7 @@ public void setUp() {
   public void testCreateTable() throws Exception {
     Table table = mockTable("person");
     store.createTable(table);
-    Table actualTable = store.getTable("person");
+    Table actualTable = store.getTables().get("person");
     assertEquals(table, actualTable);
   }
 
@@ -72,23 +71,21 @@ public void testCreateTable_duplicatedName() throws 
Exception {
     store.createTable(table);
   }
 
-  @Test
-  public void testGetTable_nullName() throws Exception {
-    Table table = store.getTable(null);
-    assertNull(table);
-  }
-
-  @Test public void testListTables() throws Exception {
+  @Test public void testGetTables() throws Exception {
     store.createTable(mockTable("hello"));
     store.createTable(mockTable("world"));
 
-    assertThat(store.listTables(),
-        Matchers.containsInAnyOrder(mockTable("hello"), mockTable("world")));
+    assertEquals(2, store.getTables().size());
+    assertThat(store.getTables(),
+        Matchers.hasValue(mockTable("hello")));
+    assertThat(store.getTables(),
+        Matchers.hasValue(mockTable("world")));
   }
 
   @Test public void testBuildBeamSqlTable() throws Exception {
-    store.createTable(mockTable("hello"));
-    BeamSqlTable actualSqlTable = store.buildBeamSqlTable("hello");
+    Table table = mockTable("hello");
+    store.createTable(table);
+    BeamSqlTable actualSqlTable = store.buildBeamSqlTable(table);
     assertNotNull(actualSqlTable);
     assertEquals(
         
RowSqlTypes.builder().withIntegerField("id").withVarcharField("name").build(),
@@ -96,11 +93,6 @@ public void testGetTable_nullName() throws Exception {
     );
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void testBuildBeamSqlTable_tableNotExist() throws Exception {
-    store.buildBeamSqlTable("world");
-  }
-
   @Test
   public void testRegisterProvider() throws Exception {
     store.registerProvider(new MockTableProvider("mock", "hello", "world"));
@@ -109,7 +101,7 @@ public void testRegisterProvider() throws Exception {
     assertEquals("text", store.getProviders().get("text").getTableType());
     assertEquals("mock", store.getProviders().get("mock").getTableType());
 
-    assertEquals(2, store.listTables().size());
+    assertEquals(2, store.getTables().size());
   }
 
   @Test(expected = IllegalArgumentException.class)
@@ -157,10 +149,6 @@ public MockTableProvider(String type, String... names) {
       this.names = names;
     }
 
-    @Override public void init() {
-
-    }
-
     @Override public String getTableType() {
       return type;
     }
@@ -173,10 +161,10 @@ public MockTableProvider(String type, String... names) {
 
     }
 
-    @Override public List<Table> listTables() {
-      List<Table> ret = new ArrayList<>(names.length);
+    @Override public Map<String, Table> getTables() {
+      Map<String, Table> ret = new HashMap(names.length);
       for (String name : names) {
-        ret.add(mockTable(name, "mock"));
+        ret.put(name, mockTable(name, "mock"));
       }
 
       return ret;
@@ -185,9 +173,5 @@ public MockTableProvider(String type, String... names) {
     @Override public BeamSqlTable buildBeamSqlTable(Table table) {
       return null;
     }
-
-    @Override public void close() {
-
-    }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 99477)
    Time Spent: 15h 10m  (was: 15h)

> Take advantage of Calcite DDL
> -----------------------------
>
>                 Key: BEAM-4044
>                 URL: https://issues.apache.org/jira/browse/BEAM-4044
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Andrew Pilloud
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 15h 10m
>  Remaining Estimate: 0h
>
> In Calcite 1.15 support for abstract DDL moved into calcite core. We should 
> take advantage of that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to