This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0d91a79 [BEAM-3176] support drop table (#4184)
0d91a79 is described below
commit 0d91a7920b6ada1424248d4fb7301db23353891a
Author: James Xu <[email protected]>
AuthorDate: Tue Feb 13 06:16:44 2018 +0800
[BEAM-3176] support drop table (#4184)
* [BEAM-3176] support drop table
* add UnitTest to assert table are removed from BeamQueryPlanner once droped
* fix findbugs found bugs..
---
.../sql/src/main/codegen/data/Parser.tdd | 3 +-
.../sql/src/main/codegen/includes/parserImpls.ftl | 17 +++++
.../apache/beam/sdk/extensions/sql/BeamSqlCli.java | 8 +++
.../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 30 +++++++-
.../extensions/sql/impl/parser/SqlDropTable.java | 79 ++++++++++++++++++++++
.../sql/meta/provider/TableProvider.java | 7 ++
.../meta/provider/kafka/KafkaTableProvider.java | 4 ++
.../sql/meta/provider/text/TextTableProvider.java | 4 ++
.../sql/meta/store/InMemoryMetaStore.java | 10 +++
.../sdk/extensions/sql/meta/store/MetaStore.java | 6 +-
.../beam/sdk/extensions/sql/BeamSqlCliTest.java | 45 ++++++++++++
.../sql/impl/parser/BeamSqlParserTest.java | 12 ++++
.../sql/meta/store/InMemoryMetaStoreTest.java | 4 ++
13 files changed, 226 insertions(+), 3 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
index 09a5379..1afa73d 100644
--- a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
+++ b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
@@ -36,7 +36,8 @@
# List of methods for parsing custom SQL statements.
statementParserMethods: [
- "SqlCreateTable()"
+ "SqlCreateTable()",
+ "SqlDropTable()"
]
# List of methods for parsing custom literals.
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index 136c728..ce1d2ae 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -87,3 +87,20 @@ SqlNode SqlCreateTable() :
location, tbl_properties, select);
}
}
+
+/**
+ * DROP TABLE table_name
+ */
+SqlNode SqlDropTable() :
+{
+ SqlParserPos pos;
+ SqlIdentifier tblName;
+}
+{
+ <DROP> { pos = getPos(); }
+ <TABLE>
+ tblName = SimpleIdentifier() {
+ return new SqlDropTable(pos, tblName);
+ }
+}
+
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index ebac783..eadda35 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.parser.BeamSqlParser;
import org.apache.beam.sdk.extensions.sql.impl.parser.ParserUtils;
import org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateTable;
+import org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropTable;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
@@ -80,6 +81,8 @@ public class BeamSqlCli {
if (sqlNode instanceof SqlCreateTable) {
handleCreateTable((SqlCreateTable) sqlNode, metaStore);
+ } else if (sqlNode instanceof SqlDropTable) {
+ handleDropTable((SqlDropTable) sqlNode);
} else {
PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[]
{}).withValidation()
.as(PipelineOptions.class);
@@ -103,6 +106,11 @@ public class BeamSqlCli {
env.registerTable(table.getName(),
metaStore.buildBeamSqlTable(table.getName()));
}
+ private void handleDropTable(SqlDropTable stmt) {
+ metaStore.dropTable(stmt.tableName());
+ env.deregisterTable(stmt.tableName());
+ }
+
/**
* compile SQL, and return a {@link Pipeline}.
*/
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index a8bc48e..11f1a46 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,7 +17,11 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
@@ -48,11 +52,13 @@ import org.apache.calcite.tools.Frameworks;
* <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF
functions,
* and a {@link BeamQueryPlanner} which parse/validate/optimize/translate
input SQL queries.
*/
-public class BeamSqlEnv implements Serializable{
+public class BeamSqlEnv implements Serializable {
transient SchemaPlus schema;
transient BeamQueryPlanner planner;
+ transient Map<String, BeamSqlTable> tables;
public BeamSqlEnv() {
+ tables = new HashMap<String, BeamSqlTable>(16);
schema = Frameworks.createRootSchema(true);
planner = new BeamQueryPlanner(schema);
}
@@ -85,10 +91,24 @@ public class BeamSqlEnv implements Serializable{
*
*/
public void registerTable(String tableName, BeamSqlTable table) {
+ tables.put(tableName, table);
schema.add(tableName, new BeamCalciteTable(table.getRowType()));
planner.getSourceTables().put(tableName, table);
}
+ public void deregisterTable(String targetTableName) {
+ // reconstruct the schema
+ schema = Frameworks.createRootSchema(true);
+ for (Map.Entry<String, BeamSqlTable> entry : tables.entrySet()) {
+ String tableName = entry.getKey();
+ BeamSqlTable table = entry.getValue();
+ if (!tableName.equals(targetTableName)) {
+ schema.add(tableName, new BeamCalciteTable(table.getRowType()));
+ }
+ }
+ planner = new BeamQueryPlanner(schema);
+ }
+
/**
* Find {@link BaseBeamTable} by table name.
*/
@@ -133,4 +153,12 @@ public class BeamSqlEnv implements Serializable{
public BeamQueryPlanner getPlanner() {
return planner;
}
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+
+ tables = new HashMap<String, BeamSqlTable>(16);
+ schema = Frameworks.createRootSchema(true);
+ planner = new BeamQueryPlanner(schema);
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
new file mode 100644
index 0000000..6f703c9
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import java.util.List;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+/**
+ * A Calcite {@code SqlCall} which represents a drop table statement.
+ */
+public class SqlDropTable extends SqlCall {
+ private final SqlIdentifier tableName;
+
+ public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+ "DROP_TABLE", SqlKind.OTHER) {
+ @Override
+ public SqlCall createCall(
+ SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+ assert functionQualifier == null;
+ return new SqlDropTable(pos, (SqlIdentifier) o[0]);
+ }
+
+ @Override
+ public void unparse(
+ SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+ SqlDropTable t = (SqlDropTable) call;
+ UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+ u.keyword("DROP", "TABLE").node(t.tableName);
+ }
+ };
+
+ public SqlDropTable(SqlParserPos pos, SqlIdentifier tableName) {
+ super(pos);
+ this.tableName = tableName;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ getOperator().unparse(writer, this, leftPrec, rightPrec);
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.<SqlNode>of(tableName);
+ }
+
+ public String tableName() {
+ return tableName.toString().toLowerCase();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
index d57f703..5bbadb1 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
@@ -45,6 +45,13 @@ public interface TableProvider {
void createTable(Table table);
/**
+ * Drops a table.
+ *
+ * @param tableName
+ */
+ void dropTable(String tableName);
+
+ /**
* List all tables from this provider.
*/
List<Table> listTables();
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index 7835472..c143828 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -68,6 +68,10 @@ public class KafkaTableProvider implements TableProvider {
// empty
}
+ @Override public void dropTable(String tableName) {
+ // empty
+ }
+
@Override public List<Table> listTables() {
return Collections.emptyList();
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 36a7590..b871949 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -69,6 +69,10 @@ public class TextTableProvider implements TableProvider {
// empty
}
+ @Override public void dropTable(String tableName) {
+ // empty
+ }
+
@Override public List<Table> listTables() {
return Collections.emptyList();
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
index bacfbff..53eeb7e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java
@@ -55,6 +55,16 @@ public class InMemoryMetaStore implements MetaStore {
tables.put(table.getName(), table);
}
+ @Override public void dropTable(String tableName) {
+ if (!tables.containsKey(tableName)) {
+ throw new IllegalArgumentException("No such table: " + tableName);
+ }
+
+ Table table = tables.get(tableName);
+ providers.get(table.getType()).dropTable(tableName);
+ tables.remove(tableName);
+ }
+
@Override public Table getTable(String tableName) {
if (tableName == null) {
return null;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
index 2f395f0..ac5b739 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/MetaStore.java
@@ -27,13 +27,17 @@ import
org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
* The interface to handle CRUD of {@code BeamSql} table metadata.
*/
public interface MetaStore {
-
/**
* create a table.
*/
void createTable(Table table);
/**
+ * drop a table.
+ */
+ void dropTable(String tableName);
+
+ /**
* Get table with the specified name.
*/
Table getTable(String tableName);
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 62d6933..9bf724d 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -19,10 +19,12 @@ package org.apache.beam.sdk.extensions.sql;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.calcite.tools.ValidationException;
import org.junit.Test;
/**
@@ -49,6 +51,49 @@ public class BeamSqlCliTest {
}
@Test
+ public void testExecute_dropTable() throws Exception {
+ InMemoryMetaStore metaStore = new InMemoryMetaStore();
+ metaStore.registerProvider(new TextTableProvider());
+
+ BeamSqlCli cli = new BeamSqlCli()
+ .metaStore(metaStore);
+ cli.execute(
+ "create table person (\n"
+ + "id int COMMENT 'id', \n"
+ + "name varchar(31) COMMENT 'name', \n"
+ + "age int COMMENT 'age') \n"
+ + "TYPE 'text' \n"
+ + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ );
+ Table table = metaStore.getTable("person");
+ assertNotNull(table);
+
+ cli.execute("drop table person");
+ table = metaStore.getTable("person");
+ assertNull(table);
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testExecute_dropTable_assertTableRemovedFromPlanner() throws
Exception {
+ InMemoryMetaStore metaStore = new InMemoryMetaStore();
+ metaStore.registerProvider(new TextTableProvider());
+
+ BeamSqlCli cli = new BeamSqlCli()
+ .metaStore(metaStore);
+ cli.execute(
+ "create table person (\n"
+ + "id int COMMENT 'id', \n"
+ + "name varchar(31) COMMENT 'name', \n"
+ + "age int COMMENT 'age') \n"
+ + "TYPE 'text' \n"
+ + "COMMENT '' LOCATION 'text://home/admin/orders'"
+ );
+ cli.execute("drop table person");
+ cli.explainQuery("select * from person");
+ }
+
+
+ @Test
public void testExplainQuery() throws Exception {
InMemoryMetaStore metaStore = new InMemoryMetaStore();
metaStore.registerProvider(new TextTableProvider());
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
index 63deb89..f6bc5d0 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
@@ -122,6 +122,18 @@ public class BeamSqlParserTest {
);
}
+ @Test
+ public void testParseDropTable() throws Exception {
+ BeamSqlParser parser = new BeamSqlParser("drop table person");
+ SqlNode sqlNode = parser.impl().parseSqlStmtEof();
+
+ assertNotNull(sqlNode);
+ assertTrue(sqlNode instanceof SqlDropTable);
+ SqlDropTable stmt = (SqlDropTable) sqlNode;
+ assertNotNull(stmt);
+ assertEquals("person", stmt.tableName());
+ }
+
private Table parseTable(String sql) throws Exception {
BeamSqlParser parser = new BeamSqlParser(sql);
SqlNode sqlNode = parser.impl().parseSqlStmtEof();
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index bd0ba93..e86ba54 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -163,6 +163,10 @@ public class InMemoryMetaStoreTest {
}
+ @Override public void dropTable(String tableName) {
+
+ }
+
@Override public List<Table> listTables() {
List<Table> ret = new ArrayList<>(names.length);
for (String name : names) {
--
To stop receiving notification emails like this one, please contact
[email protected].