[ 
https://issues.apache.org/jira/browse/BEAM-4044?focusedWorklogId=91592&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91592
 ]

ASF GitHub Bot logged work on BEAM-4044:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Apr/18 04:13
            Start Date: 17/Apr/18 04:13
    Worklog Time Spent: 10m 
      Work Description: XuMingmin closed pull request #5040: [BEAM-4044] [SQL] 
Refresh DDL from 1.16
URL: https://github.com/apache/beam/pull/5040
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp 
b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
index 61645e29f55..5ecb3d53fe4 100644
--- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
+++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
@@ -1,10 +1,9 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
 #
 # http://www.apache.org/licenses/LICENSE-2.0
 #
@@ -15,9 +14,79 @@
 # limitations under the License.
 
 data: {
-  parser:                   tdd(../data/Parser.tdd)
-}
+    parser: {
+      # Generated parser implementation class package and name
+      package: "org.apache.beam.sdk.extensions.sql.impl.parser.impl",
+      class: "BeamSqlParserImpl",
+
+      # List of import statements.
+      imports: [
+        "org.apache.calcite.schema.ColumnStrategy"
+        "org.apache.calcite.sql.SqlCreate"
+        "org.apache.calcite.sql.SqlDrop"
+        "org.apache.beam.sdk.extensions.sql.impl.parser.SqlDdlNodes"
+      ]
+
+      # List of keywords.
+      keywords: [
+        "COMMENT"
+        "IF"
+       "LOCATION"
+       "TBLPROPERTIES"
+      ]
+
+      # List of keywords from "keywords" section that are not reserved.
+      nonReservedKeywords: [
+        "COMMENT"
+        "IF"
+       "LOCATION"
+       "TBLPROPERTIES"
+      ]
+
+      # List of methods for parsing custom SQL statements.
+      statementParserMethods: [
+      ]
+
+      # List of methods for parsing custom literals.
+      # Example: ParseJsonLiteral().
+      literalParserMethods: [
+      ]
+
+      # List of methods for parsing custom data types.
+      dataTypeParserMethods: [
+      ]
 
+      # List of methods for parsing extensions to "ALTER <scope>" calls.
+      # Each must accept arguments "(SqlParserPos pos, String scope)".
+      alterStatementParserMethods: [
+      ]
+
+      # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
+      # Each must accept arguments "(SqlParserPos pos, boolean replace)".
+      createStatementParserMethods: [
+        "SqlCreateTable"
+      ]
+
+      # List of methods for parsing extensions to "DROP" calls.
+      # Each must accept arguments "(SqlParserPos pos)".
+      dropStatementParserMethods: [
+        "SqlDropTable"
+      ]
+
+      # List of files in @includes directory that have parser method
+      # implementations for parsing custom SQL statements, literals or types
+      # given as part of "statementParserMethods", "literalParserMethods" or
+      # "dataTypeParserMethods".
+      implementationFiles: [
+        "parserImpls.ftl"
+      ]
+
+      includeCompoundIdentifier: true
+      includeBraces: true
+      includeAdditionalDeclarations: false
+
+    }
+}
 freemarkerLinks: {
-  includes: includes/
+    includes: includes/
 }
diff --git a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd 
b/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
deleted file mode 100644
index 1afa73d255b..00000000000
--- a/sdks/java/extensions/sql/src/main/codegen/data/Parser.tdd
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  # Generated parser implementation class package and name
-  package: "org.apache.beam.sdk.extensions.sql.impl.parser.impl",
-  class: "BeamSqlParserImpl",
-
-  # List of import statements.
-  imports: [
-    "org.apache.calcite.sql.validate.*",
-    "org.apache.calcite.util.*",
-    "org.apache.beam.sdk.extensions.sql.impl.parser.*",
-    "java.util.*"
-  ]
-
-  # List of keywords.
-  keywords: [
-    "LOCATION",
-    "TBLPROPERTIES",
-    "COMMENT"
-  ]
-
-  # List of methods for parsing custom SQL statements.
-  statementParserMethods: [
-    "SqlCreateTable()",
-    "SqlDropTable()"
-  ]
-
-  # List of methods for parsing custom literals.
-  # Example: ParseJsonLiteral().
-  literalParserMethods: [
-  ]
-
-  # List of methods for parsing custom data types.
-  dataTypeParserMethods: [
-  ]
-
-  nonReservedKeywords: [
-  ]
-
-  createStatementParserMethods: [
-  ]
-
-  alterStatementParserMethods: [
-  ]
-
-  dropStatementParserMethods: [
-  ]
-
-  # List of files in @includes directory that have parser method
-  # implementations for custom SQL statements, literals or types
-  # given as part of "statementParserMethods", "literalParserMethods" or
-  # "dataTypeParserMethods".
-  implementationFiles: [
-    "parserImpls.ftl"
-  ]
-
-  includeCompoundIdentifier: true,
-  includeBraces: true,
-  includeAdditionalDeclarations: false,
-  allowBangEqual: false
-}
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/license.ftl 
b/sdks/java/extensions/sql/src/main/codegen/includes/license.ftl
deleted file mode 100644
index 7e66353b7db..00000000000
--- a/sdks/java/extensions/sql/src/main/codegen/includes/license.ftl
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl 
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index ce1d2ae7ce2..30705443f85 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -9,98 +9,148 @@
   OF ANY KIND, either express or implied. See the License for the specific
   language governing permissions and limitations under the License. -->
 
+boolean IfNotExistsOpt() :
+{
+}
+{
+    <IF> <NOT> <EXISTS> { return true; }
+|
+    { return false; }
+}
 
-private void ColumnDef(List<ColumnDefinition> list) :
+boolean IfExistsOpt() :
 {
-    SqlParserPos pos;
-    SqlIdentifier name;
-    SqlDataTypeSpec type;
-    ColumnConstraint constraint = null;
-    SqlNode comment = null;
 }
 {
-    name = SimpleIdentifier() { pos = getPos(); }
-    type = DataType()
-    [
-      <PRIMARY> <KEY>
-      { constraint = new ColumnConstraint.PrimaryKey(getPos()); }
-    ]
+    <IF> <EXISTS> { return true; }
+|
+    { return false; }
+}
+
+SqlNodeList Options() :
+{
+    final Span s;
+    final List<SqlNode> list = Lists.newArrayList();
+}
+{
+    <OPTIONS> { s = span(); } <LPAREN>
     [
-      <COMMENT> comment = StringLiteral()
+        Option(list)
+        (
+            <COMMA>
+            Option(list)
+        )*
     ]
-    {
-        list.add(new ColumnDefinition(name, type, constraint, comment, pos));
+    <RPAREN> {
+        return new SqlNodeList(list, s.end(this));
+    }
+}
+
+void Option(List<SqlNode> list) :
+{
+    final SqlIdentifier id;
+    final SqlNode value;
+}
+{
+    id = SimpleIdentifier()
+    value = Literal() {
+        list.add(id);
+        list.add(value);
     }
 }
 
-SqlNodeList ColumnDefinitionList() :
+SqlNodeList TableElementList() :
 {
-    SqlParserPos pos;
-    List<ColumnDefinition> list = Lists.newArrayList();
+    final Span s;
+    final List<SqlNode> list = Lists.newArrayList();
 }
 {
-    <LPAREN> { pos = getPos(); }
-    ColumnDef(list)
-    ( <COMMA> ColumnDef(list) )*
+    <LPAREN> { s = span(); }
+    TableElement(list)
+    (
+        <COMMA> TableElement(list)
+    )*
     <RPAREN> {
-        return new SqlNodeList(list, pos.plus(getPos()));
+        return new SqlNodeList(list, s.end(this));
+    }
+}
+
+void TableElement(List<SqlNode> list) :
+{
+    final SqlIdentifier id;
+    final SqlDataTypeSpec type;
+    final boolean nullable;
+    SqlNode comment = null;
+    final Span s = Span.of();
+}
+{
+    id = SimpleIdentifier()
+    (
+        type = DataType()
+        (
+            <NULL> { nullable = true; }
+        |
+            <NOT> <NULL> { nullable = false; }
+        |
+            { nullable = true; }
+        )
+        [ <COMMENT> comment = StringLiteral() ]
+        {
+            list.add(
+                SqlDdlNodes.column(s.add(id).end(this), id,
+                    type.withNullable(nullable), comment));
+        }
+    |
+        { list.add(id); }
+    )
+|
+    id = SimpleIdentifier() {
+        list.add(id);
     }
 }
 
 /**
+ * Note: This example is probably out of sync with the code.
+ *
  * CREATE TABLE ( IF NOT EXISTS )?
- *   ( database_name '.' )? table_name ( '(' column_def ( ',' column_def )* ')'
- *   ( STORED AS INPUTFORMAT input_format_classname OUTPUTFORMAT 
output_format_classname )?
- *   LOCATION location_uri
+ *   ( database_name '.' )? table_name '(' column_def ( ',' column_def )* ')'
+ *   TYPE type_name
+ *   ( COMMENT comment_string )?
+ *   ( LOCATION location_string )?
  *   ( TBLPROPERTIES tbl_properties )?
- *   ( AS select_stmt )
  */
-SqlNode SqlCreateTable() :
+SqlCreate SqlCreateTable(Span s, boolean replace) :
 {
-    SqlParserPos pos;
-    SqlIdentifier tblName;
-    SqlNodeList fieldList;
+    final boolean ifNotExists;
+    final SqlIdentifier id;
+    SqlNodeList tableElementList = null;
     SqlNode type = null;
     SqlNode comment = null;
     SqlNode location = null;
-    SqlNode tbl_properties = null;
-    SqlNode select = null;
+    SqlNode tblProperties = null;
 }
 {
-    <CREATE> { pos = getPos(); }
-    <TABLE>
-    tblName = CompoundIdentifier()
-    fieldList = ColumnDefinitionList()
-    <TYPE>
-    type = StringLiteral()
-    [
-    <COMMENT>
-    comment = StringLiteral()
-    ]
-    [
-    <LOCATION>
-    location = StringLiteral()
-    ]
-    [ <TBLPROPERTIES> tbl_properties = StringLiteral() ]
-    [ <AS> select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] {
-        return new SqlCreateTable(pos, tblName, fieldList, type, comment,
-        location, tbl_properties, select);
+    <TABLE> ifNotExists = IfNotExistsOpt() id = CompoundIdentifier()
+    tableElementList = TableElementList()
+    <TYPE> type = StringLiteral()
+    [ <COMMENT> comment = StringLiteral() ]
+    [ <LOCATION> location = StringLiteral() ]
+    [ <TBLPROPERTIES> tblProperties = StringLiteral() ]
+    {
+        return SqlDdlNodes.createTable(s.end(this), replace, ifNotExists, id,
+            tableElementList, type, comment, location, tblProperties);
     }
 }
 
-/**
- * DROP TABLE table_name
- */
-SqlNode SqlDropTable() :
+SqlDrop SqlDropTable(Span s, boolean replace) :
 {
-    SqlParserPos pos;
-    SqlIdentifier tblName;
+    final boolean ifExists;
+    final SqlIdentifier id;
 }
 {
-    <DROP> { pos = getPos(); }
-    <TABLE>
-    tblName = SimpleIdentifier() {
-        return new SqlDropTable(pos, tblName);
+    <TABLE> ifExists = IfExistsOpt() id = CompoundIdentifier() {
+        return SqlDdlNodes.dropTable(s.end(this), ifExists, id);
     }
 }
 
+// End parserImpls.ftl
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index eadda35fd5f..8cf689084d7 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -22,7 +22,6 @@
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.parser.BeamSqlParser;
-import org.apache.beam.sdk.extensions.sql.impl.parser.ParserUtils;
 import org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateTable;
 import org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropTable;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -94,7 +93,7 @@ public void execute(String sqlString) throws Exception {
   }
 
   private void handleCreateTable(SqlCreateTable stmt, MetaStore store) {
-    Table table = ParserUtils.convertCreateTableStmtToTable(stmt);
+    Table table = stmt.toTable();
     if (table.getType() == null) {
       throw new IllegalStateException("Table type is not specified and 
BeamSqlCli#defaultTableType"
           + "is not configured!");
@@ -107,8 +106,8 @@ private void handleCreateTable(SqlCreateTable stmt, 
MetaStore store) {
   }
 
   private void handleDropTable(SqlDropTable stmt) {
-    metaStore.dropTable(stmt.tableName());
-    env.deregisterTable(stmt.tableName());
+    metaStore.dropTable(stmt.getNameSimple());
+    env.deregisterTable(stmt.getNameSimple());
   }
 
   /**
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnConstraint.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnConstraint.java
deleted file mode 100644
index 965daa2b1e7..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnConstraint.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.parser;
-
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Column constraint such as primary key.
- */
-public class ColumnConstraint extends SqlLiteral {
-  private ColumnConstraint(
-      Object value, SqlTypeName typeName, SqlParserPos pos) {
-    super(value, typeName, pos);
-  }
-
-  /**
-   * A primary key constraint.
-   */
-  public static class PrimaryKey extends ColumnConstraint {
-    public PrimaryKey(SqlParserPos pos) {
-      super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnDefinition.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnDefinition.java
deleted file mode 100644
index fce8d2ce66e..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ColumnDefinition.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.parser;
-
-import java.util.Arrays;
-import org.apache.calcite.sql.SqlDataTypeSpec;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.NlsString;
-
-/**
- * Column definition used during sql parsing(mainly DDL which not supported by 
default).
- */
-public class ColumnDefinition extends SqlNodeList {
-  public ColumnDefinition(
-      SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint,
-      SqlNode comment, SqlParserPos pos) {
-    super(Arrays.asList(name, type, constraint, comment), pos);
-  }
-
-  public String name() {
-    return get(0).toString();
-  }
-
-  public SqlDataTypeSpec type() {
-    return (SqlDataTypeSpec) get(1);
-  }
-
-  public ColumnConstraint constraint() {
-    SqlNode constraintNode = get(2);
-    return constraintNode == null ? null : (ColumnConstraint) constraintNode;
-  }
-
-  public String comment() {
-    SqlNode commentNode = get(3);
-    return commentNode == null ? null : ((NlsString) 
SqlLiteral.value(commentNode)).getValue();
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserUtils.java
deleted file mode 100644
index 9b60e10c44f..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserUtils.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.parser;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.meta.Column;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-
-/**
- * Util method for parser.
- */
-public class ParserUtils {
-
-  /**
-   * Convert a create table statement to a {@code Table} object.
-   * @param stmt
-   * @return the table
-   */
-  public static Table convertCreateTableStmtToTable(SqlCreateTable stmt) {
-    List<Column> columns = new ArrayList<>(stmt.fieldList().size());
-    for (ColumnDefinition columnDef : stmt.fieldList()) {
-      Column column = Column.builder()
-          .name(columnDef.name().toLowerCase())
-          .fieldType(CalciteUtils.toFieldType(
-              columnDef.type().deriveType(BeamQueryPlanner.TYPE_FACTORY)))
-          .comment(columnDef.comment())
-          .primaryKey(columnDef.constraint() instanceof 
ColumnConstraint.PrimaryKey)
-          .build();
-      columns.add(column);
-    }
-
-    Table table = Table.builder()
-        .type(stmt.type().toLowerCase())
-        .name(stmt.tableName().toLowerCase())
-        .columns(columns)
-        .comment(stmt.comment())
-        .location(stmt.location())
-        .properties(stmt.properties())
-        .build();
-
-    return table;
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCheckConstraint.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCheckConstraint.java
new file mode 100644
index 00000000000..94018b5c944
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCheckConstraint.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import java.util.List;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+/**
+ * Parse tree for {@code UNIQUE}, {@code PRIMARY KEY} constraints.
+ *
+ * <p>And {@code FOREIGN KEY}, when we support it.
+ */
+public class SqlCheckConstraint extends SqlCall {
+  private static final SqlSpecialOperator OPERATOR =
+      new SqlSpecialOperator("CHECK", SqlKind.CHECK);
+
+  private final SqlIdentifier name;
+  private final SqlNode expression;
+
+  /** Creates a SqlCheckConstraint; use {@link SqlDdlNodes#check}. */
+  SqlCheckConstraint(SqlParserPos pos, SqlIdentifier name,
+      SqlNode expression) {
+    super(pos);
+    this.name = name; // may be null
+    this.expression = expression;
+  }
+
+  @Override public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override public List<SqlNode> getOperandList() {
+    return ImmutableNullableList.of(name, expression);
+  }
+
+  @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) 
{
+    if (name != null) {
+      writer.keyword("CONSTRAINT");
+      name.unparse(writer, 0, 0);
+    }
+    writer.keyword("CHECK");
+    if (writer.isAlwaysUseParentheses()) {
+      expression.unparse(writer, 0, 0);
+    } else {
+      writer.sep("(");
+      expression.unparse(writer, 0, 0);
+      writer.sep(")");
+    }
+  }
+}
+
+// End SqlCheckConstraint.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlColumnDeclaration.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlColumnDeclaration.java
new file mode 100644
index 00000000000..8188528fcbb
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlColumnDeclaration.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Parse tree for column.
+ */
+public class SqlColumnDeclaration extends SqlCall {
+  private static final SqlSpecialOperator OPERATOR =
+      new SqlSpecialOperator("COLUMN_DECL", SqlKind.COLUMN_DECL);
+
+  final SqlIdentifier name;
+  final SqlDataTypeSpec dataType;
+  final SqlNode comment;
+
+  /** Creates a SqlColumnDeclaration; use {@link SqlDdlNodes#column}. */
+  SqlColumnDeclaration(SqlParserPos pos, SqlIdentifier name,
+      SqlDataTypeSpec dataType, SqlNode comment) {
+    super(pos);
+    this.name = name;
+    this.dataType = dataType;
+    this.comment = comment;
+  }
+
+  @Override public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override public List<SqlNode> getOperandList() {
+    return ImmutableList.of(name, dataType);
+  }
+
+  @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) 
{
+    name.unparse(writer, 0, 0);
+    dataType.unparse(writer, 0, 0);
+    if (dataType.getNullable() != null && !dataType.getNullable()) {
+      writer.keyword("NOT NULL");
+    }
+    if (comment != null) {
+      writer.keyword("COMMENT");
+      comment.unparse(writer, 0, 0);
+    }
+  }
+}
+
+// End SqlColumnDeclaration.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
index 15e8b960658..3a1cfed02f9 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +16,18 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.parser;
 
-import com.alibaba.fastjson.JSON;
+import static com.alibaba.fastjson.JSON.parseObject;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.alibaba.fastjson.JSONObject;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
 import java.util.List;
-import org.apache.calcite.sql.SqlCall;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.Column;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.sql.SqlCreate;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
@@ -35,97 +41,64 @@
 import org.apache.calcite.util.NlsString;
 
 /**
- * A Calcite {@code SqlCall} which represents a create table statement.
+ * Parse tree for {@code CREATE TABLE} statement.
  */
-public class SqlCreateTable extends SqlCall {
-  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
-      "CREATE_TABLE", SqlKind.OTHER) {
-    @Override
-    public SqlCall createCall(
-        SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
-      assert functionQualifier == null;
-      return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1],
-                                o[2], o[3], o[4], o[5], o[6]);
-    }
-
-    @Override
-    public void unparse(
-        SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
-      SqlCreateTable t = (SqlCreateTable) call;
-      UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
-      u.keyword("CREATE", "TABLE").node(t.tblName).nodeList(
-          t.fieldList);
-      u.keyword("TYPE").node(t.type);
-      u.keyword("COMMENT").node(t.comment);
-      u.keyword("LOCATION").node(t.location);
-      if (t.properties != null) {
-        u.keyword("TBLPROPERTIES").node(t.properties);
-      }
-      if (t.query != null) {
-        u.keyword("AS").node(t.query);
-      }
-    }
-  };
-
-  private final SqlIdentifier tblName;
-  private final SqlNodeList fieldList;
+public class SqlCreateTable extends SqlCreate {
+  private final SqlIdentifier name;
+  private final SqlNodeList columnList;
   private final SqlNode type;
   private final SqlNode comment;
   private final SqlNode location;
-  private final SqlNode properties;
-  private final SqlNode query;
-
-  public SqlCreateTable(
-          SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList, 
SqlNode type,
-          SqlNode comment, SqlNode location, SqlNode properties, SqlNode 
query) {
-    super(pos);
-    this.tblName = tblName;
-    this.fieldList = fieldList;
-    this.type = type;
-    this.comment = comment;
-    this.location = location;
-    this.properties = properties;
-    this.query = query;
-  }
+  private final SqlNode tblProperties;
 
-  @Override
-  public SqlOperator getOperator() {
-    return OPERATOR;
-  }
+  private static final SqlOperator OPERATOR =
+      new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);
 
-  @Override
-  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-    getOperator().unparse(writer, this, leftPrec, rightPrec);
+  /** Creates a SqlCreateTable. */
+  SqlCreateTable(SqlParserPos pos, boolean replace, boolean ifNotExists,
+      SqlIdentifier name, SqlNodeList columnList, SqlNode type,
+      SqlNode comment, SqlNode location, SqlNode tblProperties) {
+    super(OPERATOR, pos, replace, ifNotExists);
+    this.name = checkNotNull(name);
+    this.columnList = columnList; // may be null
+    this.type = checkNotNull(type);
+    this.comment = comment; // may be null
+    this.location = location; // may be null
+    this.tblProperties = tblProperties; // may be null
   }
 
-  @Override
   public List<SqlNode> getOperandList() {
-    return ImmutableNullableList.of(tblName, fieldList, location, properties,
-                                    query);
-  }
-
-  public String tableName() {
-    return tblName.toString();
-  }
-
-  public String location() {
-    return location == null ? null : getString(location);
-  }
-
-  public String type() {
-    return type == null ? null : getString(type);
+    return ImmutableNullableList.of(name, columnList, type, comment, location, 
tblProperties);
   }
 
-  public String comment() {
-    return comment == null ? null : getString(comment);
-  }
-
-  public JSONObject properties() {
-    String propertiesStr = getString(properties);
-    if (Strings.isNullOrEmpty(propertiesStr)) {
-      return new JSONObject();
-    } else {
-      return JSON.parseObject(propertiesStr);
+  @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) 
{
+    writer.keyword("CREATE");
+    writer.keyword("TABLE");
+    if (ifNotExists) {
+      writer.keyword("IF NOT EXISTS");
+    }
+    name.unparse(writer, leftPrec, rightPrec);
+    if (columnList != null) {
+      SqlWriter.Frame frame = writer.startList("(", ")");
+      for (SqlNode c : columnList) {
+        writer.sep(",");
+        c.unparse(writer, 0, 0);
+      }
+      writer.endList(frame);
+    }
+    writer.keyword("TYPE");
+    type.unparse(writer, 0, 0);
+    if (comment != null) {
+      writer.keyword("COMMENT");
+      comment.unparse(writer, 0, 0);
+    }
+    if (location != null) {
+      writer.keyword("LOCATION");
+      location.unparse(writer, 0, 0);
+    }
+    if (tblProperties != null) {
+      writer.keyword("TBLPROPERTIES");
+      tblProperties.unparse(writer, 0, 0);
     }
   }
 
@@ -133,8 +106,41 @@ private String getString(SqlNode n) {
     return n == null ? null : ((NlsString) SqlLiteral.value(n)).getValue();
   }
 
-  @SuppressWarnings("unchecked")
-  public List<ColumnDefinition> fieldList() {
-    return (List<ColumnDefinition>) ((List<? extends SqlNode>) 
fieldList.getList());
+  public Table toTable() {
+    List<Column> columns = new ArrayList<>(columnList.size());
+    for (Ord<SqlNode> c : Ord.zip(columnList)) {
+      if (c.e instanceof SqlColumnDeclaration) {
+        final SqlColumnDeclaration d = (SqlColumnDeclaration) c.e;
+        Column column = Column.builder()
+            .name(d.name.getSimple().toLowerCase())
+            .fieldType(CalciteUtils.toFieldType(
+                
d.dataType.deriveType(BeamQueryPlanner.TYPE_FACTORY).getSqlTypeName()))
+            .nullable(d.dataType.getNullable())
+            .comment(getString(d.comment))
+            .build();
+        columns.add(column);
+      } else {
+        throw new AssertionError(c.e.getClass());
+      }
+    }
+
+    Table.Builder tb = Table.builder()
+        .type(getString(type).toLowerCase())
+        .name(name.getSimple().toLowerCase())
+        .columns(columns);
+    if (comment != null) {
+      tb.comment(getString(comment));
+    }
+    if (location != null) {
+      tb.location(getString(location));
+    }
+    if (tblProperties != null) {
+      tb.properties(parseObject(getString(tblProperties)));
+    } else {
+      tb.properties(new JSONObject());
+    }
+    return tb.build();
   }
 }
+
+// End SqlCreateTable.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDDLKeywords.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDDLKeywords.java
deleted file mode 100644
index 14a1b122ce4..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDDLKeywords.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.parser;
-
-import org.apache.calcite.sql.SqlLiteral;
-
-/**
- * Define the keywords that can occur in a CREATE TABLE statement.
- */
-public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
-  PRIMARY
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
new file mode 100644
index 00000000000..4e79abff8d1
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Utilities concerning {@link SqlNode} for DDL.
+ */
+public class SqlDdlNodes {
+  private SqlDdlNodes() {}
+
+  /** Creates a CREATE TABLE. */
+  public static SqlCreateTable createTable(SqlParserPos pos, boolean replace,
+      boolean ifNotExists, SqlIdentifier name, SqlNodeList columnList,
+      SqlNode type, SqlNode comment, SqlNode location, SqlNode tblProperties) {
+    return new SqlCreateTable(pos, replace, ifNotExists, name, columnList,
+        type, comment, location, tblProperties);
+  }
+
+  /** Creates a DROP TABLE. */
+  public static SqlDropTable dropTable(SqlParserPos pos, boolean ifExists,
+      SqlIdentifier name) {
+    return new SqlDropTable(pos, ifExists, name);
+  }
+
+  /** Creates a column declaration. */
+  public static SqlNode column(SqlParserPos pos, SqlIdentifier name,
+      SqlDataTypeSpec dataType, SqlNode comment) {
+    return new SqlColumnDeclaration(pos, name, dataType, comment);
+  }
+}
+
+// End SqlDdlNodes.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
new file mode 100644
index 00000000000..8cc0aeaa589
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlDrop;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Base class for parse trees of {@code DROP TABLE}, {@code DROP VIEW} and
+ * {@code DROP MATERIALIZED VIEW} statements.
+ */
+abstract class SqlDropObject extends SqlDrop {
+  protected final SqlIdentifier name;
+
+  /** Creates a SqlDropObject. */
+  SqlDropObject(SqlOperator operator, SqlParserPos pos, boolean ifExists,
+      SqlIdentifier name) {
+    super(operator, pos, ifExists);
+    this.name = name;
+  }
+
+  public List<SqlNode> getOperandList() {
+    return ImmutableList.<SqlNode>of(name);
+  }
+
+  @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) 
{
+    writer.keyword(getOperator().getName()); // "DROP TABLE" etc.
+    if (ifExists) {
+      writer.keyword("IF EXISTS");
+    }
+    name.unparse(writer, leftPrec, rightPrec);
+  }
+
+  public String getNameSimple() {
+    return name.getSimple().toLowerCase();
+  }
+}
+
+// End SqlDropObject.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
index 6f703c98fd6..7af6de4c5b3 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,63 +16,23 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.parser;
 
-import java.util.List;
-import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
-import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
 
 /**
- * A Calcite {@code SqlCall} which represents a drop table statement.
+ * Parse tree for {@code DROP TABLE} statement.
  */
-public class SqlDropTable extends SqlCall {
-  private final SqlIdentifier tableName;
+public class SqlDropTable extends SqlDropObject {
+  private static final SqlOperator OPERATOR =
+      new SqlSpecialOperator("DROP TABLE", SqlKind.DROP_TABLE);
 
-  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
-      "DROP_TABLE", SqlKind.OTHER) {
-    @Override
-    public SqlCall createCall(
-        SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
-      assert functionQualifier == null;
-      return new SqlDropTable(pos, (SqlIdentifier) o[0]);
-    }
-
-    @Override
-    public void unparse(
-        SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
-      SqlDropTable t = (SqlDropTable) call;
-      UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
-      u.keyword("DROP", "TABLE").node(t.tableName);
-    }
-  };
-
-  public SqlDropTable(SqlParserPos pos, SqlIdentifier tableName) {
-    super(pos);
-    this.tableName = tableName;
-  }
-
-  @Override
-  public SqlOperator getOperator() {
-    return OPERATOR;
-  }
-
-  @Override
-  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-    getOperator().unparse(writer, this, leftPrec, rightPrec);
-  }
-
-  @Override
-  public List<SqlNode> getOperandList() {
-    return ImmutableNullableList.<SqlNode>of(tableName);
-  }
-
-  public String tableName() {
-    return tableName.toString().toLowerCase();
+  /** Creates a SqlDropTable. */
+  SqlDropTable(SqlParserPos pos, boolean ifExists, SqlIdentifier name) {
+    super(OPERATOR, pos, ifExists, name);
   }
 }
+
+// End SqlDropTable.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/UnparseUtil.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/UnparseUtil.java
deleted file mode 100644
index 30e06b530b8..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/UnparseUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.parser;
-
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlWriter;
-
-class UnparseUtil {
-  private final SqlWriter writer;
-  private final int leftPrec;
-  private final int rightPrec;
-
-  UnparseUtil(SqlWriter writer, int leftPrec, int rightPrec) {
-    this.writer = writer;
-    this.leftPrec = leftPrec;
-    this.rightPrec = rightPrec;
-  }
-
-  UnparseUtil keyword(String... keywords) {
-    for (String k : keywords) {
-      writer.keyword(k);
-    }
-    return this;
-  }
-
-  UnparseUtil node(SqlNode n) {
-    n.unparse(writer, leftPrec, rightPrec);
-    return this;
-  }
-
-  UnparseUtil nodeList(SqlNodeList l) {
-    writer.keyword("(");
-    if (l.size() > 0) {
-      l.get(0).unparse(writer, leftPrec, rightPrec);
-      for (int i = 1; i < l.size(); ++i) {
-        writer.keyword(",");
-        l.get(i).unparse(writer, leftPrec, rightPrec);
-      }
-    }
-    writer.keyword(")");
-    return this;
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
index 0d98de5bd1c..04f61a47699 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
@@ -28,13 +28,12 @@
  */
 @AutoValue
 public abstract class Column implements Serializable {
-  // TODO: Add Nullable types.
   public abstract String getName();
   public abstract FieldType getFieldType();
+  public abstract Boolean getNullable();
 
   @Nullable
   public abstract String getComment();
-  public abstract boolean isPrimaryKey();
 
   public static Builder builder() {
     return new 
org.apache.beam.sdk.extensions.sql.meta.AutoValue_Column.Builder();
@@ -47,8 +46,8 @@ public static Builder builder() {
   public abstract static class Builder {
     public abstract Builder name(String name);
     public abstract Builder fieldType(FieldType fieldType);
+    public abstract Builder nullable(Boolean nullable);
     public abstract Builder comment(String comment);
-    public abstract Builder primaryKey(boolean isPrimaryKey);
     public abstract Column build();
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java
index f28c40e3c6c..6d61726bd55 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java
@@ -41,6 +41,6 @@ public static Schema getRowTypeFromTable(Table table) {
     String description = column.getComment() != null ? column.getComment() : 
"";
     return Schema.Field.of(column.getName(), column.getFieldType())
         .withDescription(description)
-        .withNullable(true);
+        .withNullable(column.getNullable());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index 89653e54cb7..a068f73af43 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -37,7 +37,7 @@
  *
  * <pre>{@code
  * CREATE TABLE ORDERS(
- *   ID INT PRIMARY KEY COMMENT 'this is the primary key',
+ *   ID INT COMMENT 'this is the primary key',
  *   NAME VARCHAR(127) COMMENT 'this is the name'
  * )
  * COMMENT 'this is the table orders'
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 102069abd88..69820fbcd10 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -35,7 +35,7 @@
  * <p>A sample of text table is:
  * <pre>{@code
  * CREATE TABLE ORDERS(
- *   ID INT PRIMARY KEY COMMENT 'this is the primary key',
+ *   ID INT COMMENT 'this is the primary key',
  *   NAME VARCHAR(127) COMMENT 'this is the name'
  * )
  * TYPE 'text'
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
index 6d5ac1ab778..74181f7148f 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
@@ -130,7 +130,7 @@ public void testParseDropTable() throws Exception {
     assertTrue(sqlNode instanceof SqlDropTable);
     SqlDropTable stmt = (SqlDropTable) sqlNode;
     assertNotNull(stmt);
-    assertEquals("person", stmt.tableName());
+    assertEquals("person", stmt.getNameSimple());
   }
 
   private Table parseTable(String sql) throws Exception {
@@ -140,7 +140,7 @@ private Table parseTable(String sql) throws Exception {
     assertNotNull(sqlNode);
     assertTrue(sqlNode instanceof SqlCreateTable);
     SqlCreateTable stmt = (SqlCreateTable) sqlNode;
-    return ParserUtils.convertCreateTableStmtToTable(stmt);
+    return stmt.toTable();
   }
 
   private static Table mockTable(String name, String type, String comment, 
JSONObject properties) {
@@ -159,13 +159,13 @@ private static Table mockTable(String name, String type, 
String comment, JSONObj
             Column.builder()
                 .name("id")
                 .fieldType(TypeName.INT32.type())
-                .primaryKey(false)
+                .nullable(true)
                 .comment("id")
                 .build(),
             Column.builder()
                 .name("name")
                 .fieldType(RowSqlTypes.VARCHAR)
-                .primaryKey(false)
+                .nullable(true)
                 .comment("name")
                 .build()
         ))
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index 58b048e39f0..ea50bc70c9f 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -69,12 +69,12 @@ private static Table mockTable(String name) {
             Column.builder()
                 .name("id")
                 .fieldType(TypeName.INT32.type())
-                .primaryKey(true)
+                .nullable(true)
             .build(),
             Column.builder()
                 .name("name")
                 .fieldType(RowSqlTypes.VARCHAR)
-                .primaryKey(false)
+                .nullable(true)
                 .build()))
         .type("kafka")
         .properties(properties)
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index 7b47e1ab3f1..2e6c59fb865 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -80,12 +80,12 @@ private static Table mockTable(String name, String format) {
             Column.builder()
                 .name("id")
                 .fieldType(TypeName.INT32.type())
-                .primaryKey(true)
+                .nullable(true)
                 .build(),
             Column.builder()
                 .name("name")
                 .fieldType(RowSqlTypes.VARCHAR)
-            .primaryKey(false)
+                .nullable(true)
                 .build()))
         .type("text")
         .properties(properties)
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index 2ee027a0294..4ae03c6acb3 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -133,12 +133,12 @@ private static Table mockTable(String name, String type) {
             Column.builder()
                 .name("id")
                 .fieldType(TypeName.INT32.type())
-                .primaryKey(true)
+                .nullable(true)
                 .build(),
             Column.builder()
                 .name("name")
                 .fieldType(RowSqlTypes.VARCHAR)
-                .primaryKey(false)
+                .nullable(true)
                 .build()))
         .type(type)
         .properties(new JSONObject())


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 91592)
    Time Spent: 3h 50m  (was: 3h 40m)

> Take advantage of Calcite DDL
> -----------------------------
>
>                 Key: BEAM-4044
>                 URL: https://issues.apache.org/jira/browse/BEAM-4044
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Andrew Pilloud
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> In Calcite 1.15 support for abstract DDL moved into calcite core. We should 
> take advantage of that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to