KurtYoung commented on a change in pull request #8548: [FLINK-6962] [table] Add create(drop) table SQL DDL URL: https://github.com/apache/flink/pull/8548#discussion_r288910348
########## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.error.SqlParseException; + +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.pretty.SqlPrettyWriter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +/** + * CREATE TABLE DDL sql call. + */ +public class SqlCreateTable extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_TABLE", SqlKind.CREATE_TABLE); + + // Marked this table used as "SOURCE" or "SINK". This flag can be modified + // dynamically base on it's occurrence in DMLs. + private String tableType; + + private SqlIdentifier tableName; + + private SqlNodeList columnList; + + private SqlNodeList propertyList; + + private SqlNodeList primaryKeyList; + + private List<SqlNodeList> uniqueKeysList; + + private SqlWatermark watermark; + + private SqlNodeList partitionKeyList; + + private SqlCharStringLiteral comment; + + public SqlCreateTable( + SqlParserPos pos, + String tableType, + SqlIdentifier tableName, + SqlNodeList columnList, + SqlNodeList primaryKeyList, + List<SqlNodeList> uniqueKeysList, + SqlWatermark watermark, + SqlNodeList propertyList, + SqlNodeList partitionKeyList, + SqlCharStringLiteral comment) { + super(pos); + this.tableType = tableType; + this.tableName = requireNonNull(tableName, "Table name is missing"); + this.columnList = requireNonNull(columnList, "Column list should not be null"); + this.primaryKeyList = primaryKeyList; + this.uniqueKeysList = uniqueKeysList; + this.watermark = watermark; + this.propertyList = propertyList; + this.partitionKeyList = partitionKeyList; + this.comment = comment; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return null; + } + + public SqlIdentifier getTableName() { + return tableName; + } + + public void setTableName(SqlIdentifier tableName) { + this.tableName = tableName; + } + + public SqlNodeList getColumnList() { + return columnList; + } + + public void setColumnList(SqlNodeList columnList) { + this.columnList = columnList; + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + public void setPropertyList(SqlNodeList propertyList) { + this.propertyList = propertyList; + } + + public SqlNodeList getPartitionKeyList() { + return partitionKeyList; + } + + public void setPartitionKeyList(SqlNodeList partitionKeyList) { + this.partitionKeyList = partitionKeyList; + } + + public SqlNodeList getPrimaryKeyList() { + return primaryKeyList; + } + + public void setPrimaryKeyList(SqlNodeList primaryKeyList) { + this.primaryKeyList = primaryKeyList; + } + + public List<SqlNodeList> getUniqueKeysList() { + return uniqueKeysList; + } + + public void setUniqueKeysList(List<SqlNodeList> uniqueKeysList) { + this.uniqueKeysList = uniqueKeysList; + } + + public String getTableType() { + return tableType; + } + + public void setTableType(String tableType) { + this.tableType = tableType; + } + + public SqlCharStringLiteral getComment() { + return comment; + } + + public void setComment(SqlCharStringLiteral comment) { + this.comment = comment; + } + + public void validate() throws SqlParseException { + Set<String> columnNames = new HashSet<>(); + if (columnList != null) { + for (SqlNode column : columnList) { + String columnName = null; + if (column instanceof SqlTableColumn) { + SqlTableColumn tableColumn = (SqlTableColumn) column; + columnName = tableColumn.getName().getSimple(); + String typeName = tableColumn.getType().getTypeName().getSimple(); + if (SqlColumnType.getType(typeName).isUnsupported()) { + throw new SqlParseException( + column.getParserPosition(), + "Not support type [" + typeName + "], at " + column.getParserPosition()); + } + } else if (column instanceof SqlBasicCall) { Review comment: why would `SqlBasicCall` exists in column list ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
