This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d91a79  [BEAM-3176] support drop table (#4184)
0d91a79 is described below

commit 0d91a7920b6ada1424248d4fb7301db23353891a
Author: James Xu <xumingmi...@gmail.com>
AuthorDate: Tue Feb 13 06:16:44 2018 +0800

    [BEAM-3176] support drop table (#4184)
    
    * [BEAM-3176] support drop table
    
    * add UnitTest to assert table are removed from BeamQueryPlanner once droped
    
    * fix findbugs found bugs..
---
 .../sql/src/main/codegen/data/Parser.tdd           |  3 +-
 .../sql/src/main/codegen/includes/parserImpls.ftl  | 17 +++++
 .../apache/beam/sdk/extensions/sql/BeamSqlCli.java |  8 +++
 .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java   | 30 +++++++-
 .../extensions/sql/impl/parser/SqlDropTable.java   | 79 ++++++++++++++++++++++
 .../sql/meta/provider/TableProvider.java           |  7 ++
 .../meta/provider/kafka/KafkaTableProvider.java    |  4 ++
 .../sql/meta/provider/text/TextTableProvider.java  |  4 ++
 .../sql/meta/store/InMemoryMetaStore.java          | 10 +++
 .../sdk/extensions/sql/meta/store/MetaStore.java   |  6 +-
 .../beam/sdk/extensions/sql/BeamSqlCliTest.java    | 45 ++++++++++++
 .../sql/impl/parser/BeamSqlParserTest.java         | 12 ++++
 .../sql/meta/store/InMemoryMetaStoreTest.java      |  4 ++
 13 files changed, 226 insertions(+), 3 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd 
b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
index 09a5379..1afa73d 100644
--- a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
+++ b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
@@ -36,7 +36,8 @@
 
   # List of methods for parsing custom SQL statements.
   statementParserMethods: [
-    "SqlCreateTable()"
+    "SqlCreateTable()",
+    "SqlDropTable()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl 
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index 136c728..ce1d2ae 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -87,3 +87,20 @@ SqlNode SqlCreateTable() :
         location, tbl_properties, select);
     }
 }
+
+/**
+ * DROP TABLE table_name
+ */
+SqlNode SqlDropTable() :
+{
+    SqlParserPos pos;
+    SqlIdentifier tblName;
+}
+{
+    <DROP> { pos = getPos(); }
+    <TABLE>
+    tblName = SimpleIdentifier() {
+        return new SqlDropTable(pos, tblName);
+    }
+}
+
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index ebac783..eadda35 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.parser.BeamSqlParser;
 import org.apache.beam.sdk.extensions.sql.impl.parser.ParserUtils;
 import org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateTable;
+import org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropTable;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
@@ -80,6 +81,8 @@ public class BeamSqlCli {
 
     if (sqlNode instanceof SqlCreateTable) {
       handleCreateTable((SqlCreateTable) sqlNode, metaStore);
+    } else if (sqlNode instanceof SqlDropTable) {
+      handleDropTable((SqlDropTable) sqlNode);
     } else {
       PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] 
{}).withValidation()
           .as(PipelineOptions.class);
@@ -103,6 +106,11 @@ public class BeamSqlCli {
     env.registerTable(table.getName(), 
metaStore.buildBeamSqlTable(table.getName()));
   }
 
+  private void handleDropTable(SqlDropTable stmt) {
+    metaStore.dropTable(stmt.tableName());
+    env.deregisterTable(stmt.tableName());
+  }
+
   /**
    * compile SQL, and return a {@link Pipeline}.
    */
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index a8bc48e..11f1a46 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,7 +17,11 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSql;
 import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
@@ -48,11 +52,13 @@ import org.apache.calcite.tools.Frameworks;
  * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF 
functions,
  * and a {@link BeamQueryPlanner} which parse/validate/optimize/translate 
input SQL queries.
  */
-public class BeamSqlEnv implements Serializable{
+public class BeamSqlEnv implements Serializable {
   transient SchemaPlus schema;
   transient BeamQueryPlanner planner;
+  transient Map<String, BeamSqlTable> tables;
 
   public BeamSqlEnv() {
+    tables = new HashMap<String, BeamSqlTable>(16);
     schema = Frameworks.createRootSchema(true);
     planner = new BeamQueryPlanner(schema);
   }
@@ -85,10 +91,24 @@ public class BeamSqlEnv implements Serializable{
    *
    */
   public void registerTable(String tableName, BeamSqlTable table) {
+    tables.put(tableName, table);
     schema.add(tableName, new BeamCalciteTable(table.getRowType()));
     planner.getSourceTables().put(tableName, table);
   }
 
+  public void deregisterTable(String targetTableName) {
+    // reconstruct the schema
+    schema = Frameworks.createRootSchema(true);
+    for (Map.Entry<String, BeamSqlTable> entry : tables.entrySet()) {
+      String tableName = entry.getKey();
+      BeamSqlTable table = entry.getValue();
+      if (!tableName.equals(targetTableName)) {
+        schema.add(tableName, new BeamCalciteTable(table.getRowType()));
+      }
+    }
+    planner = new BeamQueryPlanner(schema);
+  }
+
   /**
    * Find {@link BaseBeamTable} by table name.
    */
@@ -133,4 +153,12 @@ public class BeamSqlEnv implements Serializable{
   public BeamQueryPlanner getPlanner() {
     return planner;
   }
+
+  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+    in.defaultReadObject();
+
+    tables = new HashMap<String, BeamSqlTable>(16);
+    schema = Frameworks.createRootSchema(true);
+    planner = new BeamQueryPlanner(schema);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
new file mode 100644
index 0000000..6f703c9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import java.util.List;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+/**
+ * A Calcite {@code SqlCall} which represents a drop table statement.
+ */
+public class SqlDropTable extends SqlCall {
+  private final SqlIdentifier tableName;
+
+  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+      "DROP_TABLE", SqlKind.OTHER) {
+    @Override
+    public SqlCall createCall(
+        SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+      assert functionQualifier == null;
+      return new SqlDropTable(pos, (SqlIdentifier) o[0]);
+    }
+
+    @Override
+    public void unparse(
+        SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+      SqlDropTable t = (SqlDropTable) call;
+      UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+      u.keyword("DROP", "TABLE").node(t.tableName);
+    }
+  };
+
+  public SqlDropTable(SqlParserPos pos, SqlIdentifier tableName) {
+    super(pos);
+    this.tableName = tableName;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    getOperator().unparse(writer, this, leftPrec, rightPrec);
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    return ImmutableNullableList.<SqlNode>of(tableName);
+  }
+
+  public String tableName() {
+    return tableName.toString().toLowerCase();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
index d57f703..5bbadb1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
@@ -45,6 +45,13 @@ public interface TableProvider {
   void createTable(Table table);
 
   /**
+   * Drops a table.
+   *
+   * @param tableName
+   */
+  void dropTable(String tableName);
+
+  /**
    * List all tables from this provider.
    */
   List<Table> listTables();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index 7835472..c143828 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -68,6 +68,10 @@ public class KafkaTableProvider implements TableProvider {
     // empty
   }
 
+  @Override public void dropTable(String tableName) {
+    // empty
+  }
+
   @Override public List<Table> listTables() {
     return Collections.emptyList();
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 36a7590..b871949 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -69,6 +69,10 @@ public class TextTableProvider implements TableProvider {
     // empty
   }
 
+  @Override public void dropTable(String tableName) {
+    // empty
+  }
+
   @Override public List<Table> listTables() {
     return Collections.emptyList();
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
index bacfbff..53eeb7e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
@@ -55,6 +55,16 @@ public class InMemoryMetaStore implements MetaStore {
     tables.put(table.getName(), table);
   }
 
+  @Override public void dropTable(String tableName) {
+    if (!tables.containsKey(tableName)) {
+      throw new IllegalArgumentException("No such table: " + tableName);
+    }
+
+    Table table = tables.get(tableName);
+    providers.get(table.getType()).dropTable(tableName);
+    tables.remove(tableName);
+  }
+
   @Override public Table getTable(String tableName) {
     if (tableName == null) {
       return null;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
index 2f395f0..ac5b739 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
@@ -27,13 +27,17 @@ import 
org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
  * The interface to handle CRUD of {@code BeamSql} table metadata.
  */
 public interface MetaStore {
-
   /**
    * create a table.
    */
   void createTable(Table table);
 
   /**
+   * drop a table.
+   */
+  void dropTable(String tableName);
+
+  /**
    * Get table with the specified name.
    */
   Table getTable(String tableName);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 62d6933..9bf724d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -19,10 +19,12 @@ package org.apache.beam.sdk.extensions.sql;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.calcite.tools.ValidationException;
 import org.junit.Test;
 
 /**
@@ -49,6 +51,49 @@ public class BeamSqlCliTest {
   }
 
   @Test
+  public void testExecute_dropTable() throws Exception {
+    InMemoryMetaStore metaStore = new InMemoryMetaStore();
+    metaStore.registerProvider(new TextTableProvider());
+
+    BeamSqlCli cli = new BeamSqlCli()
+        .metaStore(metaStore);
+    cli.execute(
+        "create table person (\n"
+            + "id int COMMENT 'id', \n"
+            + "name varchar(31) COMMENT 'name', \n"
+            + "age int COMMENT 'age') \n"
+            + "TYPE 'text' \n"
+            + "COMMENT '' LOCATION 'text://home/admin/orders'"
+    );
+    Table table = metaStore.getTable("person");
+    assertNotNull(table);
+
+    cli.execute("drop table person");
+    table = metaStore.getTable("person");
+    assertNull(table);
+  }
+
+  @Test(expected = ValidationException.class)
+  public void testExecute_dropTable_assertTableRemovedFromPlanner() throws 
Exception {
+    InMemoryMetaStore metaStore = new InMemoryMetaStore();
+    metaStore.registerProvider(new TextTableProvider());
+
+    BeamSqlCli cli = new BeamSqlCli()
+        .metaStore(metaStore);
+    cli.execute(
+        "create table person (\n"
+            + "id int COMMENT 'id', \n"
+            + "name varchar(31) COMMENT 'name', \n"
+            + "age int COMMENT 'age') \n"
+            + "TYPE 'text' \n"
+            + "COMMENT '' LOCATION 'text://home/admin/orders'"
+    );
+    cli.execute("drop table person");
+    cli.explainQuery("select * from person");
+  }
+
+
+  @Test
   public void testExplainQuery() throws Exception {
     InMemoryMetaStore metaStore = new InMemoryMetaStore();
     metaStore.registerProvider(new TextTableProvider());
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
index 63deb89..f6bc5d0 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
@@ -122,6 +122,18 @@ public class BeamSqlParserTest {
     );
   }
 
+  @Test
+  public void testParseDropTable() throws Exception {
+    BeamSqlParser parser = new BeamSqlParser("drop table person");
+    SqlNode sqlNode = parser.impl().parseSqlStmtEof();
+
+    assertNotNull(sqlNode);
+    assertTrue(sqlNode instanceof SqlDropTable);
+    SqlDropTable stmt = (SqlDropTable) sqlNode;
+    assertNotNull(stmt);
+    assertEquals("person", stmt.tableName());
+  }
+
   private Table parseTable(String sql) throws Exception {
     BeamSqlParser parser = new BeamSqlParser(sql);
     SqlNode sqlNode = parser.impl().parseSqlStmtEof();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index bd0ba93..e86ba54 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -163,6 +163,10 @@ public class InMemoryMetaStoreTest {
 
     }
 
+    @Override public void dropTable(String tableName) {
+
+    }
+
     @Override public List<Table> listTables() {
       List<Table> ret = new ArrayList<>(names.length);
       for (String name : names) {

-- 
To stop receiving notification emails like this one, please contact
ming...@apache.org.

Reply via email to