fsk119 commented on code in PR #19193:
URL: https://github.com/apache/flink/pull/19193#discussion_r907567671


##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java:
##########
@@ -0,0 +1,113 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlNode to describe ALTER TABLE table_name ADD column/constraint/watermark 
clause.
+ *
+ * <p>Example: DDL like the below for add column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- add single column
+ * ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';
+ *
+ * -- add multiple columns, constraint, and watermark
+ * ALTER TABLE mytable ADD (
+ *     log_ts STRING COMMENT 'log timestamp string' FIRST,
+ *     ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
+ *     col_meta int metadata from 'mk1' virtual AFTER col_b,
+ *     PRIMARY KEY (id) NOT ENFORCED,
+ *     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
+ * );
+ * }</pre>
+ */
+public class SqlAlterTableAdd extends SqlAlterTable {

Review Comment:
   After reading the `SqlAlterTableAdd` and `SqlAlterTableModify`,  I think we 
can create an abstract classs `SqlAlterTableSchema`.
   
   In Flink, Schema only contains column, watermark and constraints.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java:
##########
@@ -0,0 +1,113 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlNode to describe ALTER TABLE table_name ADD column/constraint/watermark 
clause.
+ *
+ * <p>Example: DDL like the below for add column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- add single column
+ * ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';
+ *
+ * -- add multiple columns, constraint, and watermark
+ * ALTER TABLE mytable ADD (
+ *     log_ts STRING COMMENT 'log timestamp string' FIRST,
+ *     ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
+ *     col_meta int metadata from 'mk1' virtual AFTER col_b,
+ *     PRIMARY KEY (id) NOT ENFORCED,
+ *     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
+ * );
+ * }</pre>
+ */
+public class SqlAlterTableAdd extends SqlAlterTable {
+
+    private final SqlNodeList addedColumns;
+    @Nullable private final SqlWatermark watermark;
+    private final List<SqlTableConstraint> constraint;
+
+    public SqlAlterTableAdd(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList addedColumns,
+            @Nullable SqlWatermark sqlWatermark,
+            List<SqlTableConstraint> constraint) {
+        super(pos, tableName, null);
+        this.addedColumns = addedColumns;
+        this.watermark = sqlWatermark;
+        this.constraint = constraint;
+    }
+
+    public SqlNodeList getColumns() {
+        return addedColumns;
+    }
+
+    public Optional<SqlWatermark> getWatermark() {
+        return Optional.ofNullable(watermark);
+    }
+
+    @Nonnull
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(
+                getTableName(),
+                addedColumns,
+                watermark,
+                new SqlNodeList(constraint, SqlParserPos.ZERO));
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("ADD");
+
+        SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");

Review Comment:
   What's the meaning of the `"sds"` here? It seems you copy some codes from 
the SqlCreateTable. Why not introduce an util class to reuse these codes?



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java:
##########
@@ -94,4 +97,15 @@ public SqlNodeList getPartitionSpec() {
     public LinkedHashMap<String, String> getPartitionKVs() {
         return SqlPartitionUtils.getPartitionKVs(getPartitionSpec());
     }
+

Review Comment:
   After reading all its subclass, I think it does need a lot of work.



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -313,6 +313,175 @@ void testAlterTable() {
         sql(sql3).ok(expected3);
     }
 
+    @Test
+    void testAlterTableAdd() {
+        // add single column

Review Comment:
   I think it will make the test much clean. You can take a look at the 
`testCreateTableXXX`. It's better we can use the method name to illsturate the 
defination comparing to the comments.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/position/SqlTableColumnPosition.java:
##########
@@ -0,0 +1,104 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.position;
+
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** SqlNode to describe table column and its position. */
+public class SqlTableColumnPosition extends SqlCall {
+
+    private static final SqlOperator OPERATOR =
+            new SqlSpecialOperator("SqlTableColumnPosition", SqlKind.OTHER);
+
+    private final SqlTableColumn column;
+    private final SqlLiteral positionSpec;
+    @Nullable private final SqlIdentifier referencedColumn;
+
+    public SqlTableColumnPosition(
+            SqlParserPos pos,
+            SqlTableColumn column,
+            SqlLiteral positionSpec,
+            @Nullable SqlIdentifier referencedColumn) {
+        super(pos);
+        this.column = column;
+        this.positionSpec = positionSpec;
+        this.referencedColumn = referencedColumn;
+    }
+
+    public boolean isFirstColumn() {
+        return positionSpec.getValueAs(SqlColumnPosSpec.class) == 
SqlColumnPosSpec.FIRST;
+    }
+
+    public boolean isAfterReferencedColumn() {
+        return positionSpec.getValueAs(SqlColumnPosSpec.class) == 
SqlColumnPosSpec.AFTER
+                && referencedColumn != null;
+    }
+
+    public SqlTableColumn getColumn() {
+        return column;
+    }
+
+    public SqlLiteral getPositionSpec() {
+        return positionSpec;
+    }
+
+    @Nullable
+    public SqlIdentifier getAfterReferencedColumn() {
+        return referencedColumn;
+    }
+
+    @Nonnull
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Nonnull
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(column, positionSpec, 
referencedColumn);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        column.unparse(writer, leftPrec, rightPrec);
+        if (isFirstColumn()) {
+            positionSpec.unparse(writer, leftPrec, rightPrec);
+        } else if (isAfterReferencedColumn()) {
+            positionSpec.unparse(writer, leftPrec, rightPrec);
+            referencedColumn.unparse(writer, leftPrec, rightPrec);
+        }

Review Comment:
   add 
   ```
   
   else {
       throw new IllegalArgument(...);
   }
   ```



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/position/SqlColumnPosSpec.java:
##########
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.position;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/** Enumeration of SQL column position specification. */
+public enum SqlColumnPosSpec {
+    FIRST("FIRST"),
+    AFTER("AFTER"),
+    LAST("LAST");
+
+    private final String digest;

Review Comment:
   OK. Let's align with the others.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java:
##########
@@ -0,0 +1,113 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlNode to describe ALTER TABLE table_name ADD column/constraint/watermark 
clause.
+ *
+ * <p>Example: DDL like the below for add column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- add single column
+ * ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';
+ *
+ * -- add multiple columns, constraint, and watermark
+ * ALTER TABLE mytable ADD (
+ *     log_ts STRING COMMENT 'log timestamp string' FIRST,
+ *     ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
+ *     col_meta int metadata from 'mk1' virtual AFTER col_b,
+ *     PRIMARY KEY (id) NOT ENFORCED,
+ *     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
+ * );
+ * }</pre>
+ */
+public class SqlAlterTableAdd extends SqlAlterTable {
+
+    private final SqlNodeList addedColumns;
+    @Nullable private final SqlWatermark watermark;
+    private final List<SqlTableConstraint> constraint;
+
+    public SqlAlterTableAdd(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList addedColumns,
+            @Nullable SqlWatermark sqlWatermark,
+            List<SqlTableConstraint> constraint) {
+        super(pos, tableName, null);

Review Comment:
   use `super(pos, tableName)`;



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -313,6 +313,175 @@ void testAlterTable() {
         sql(sql3).ok(expected3);
     }
 
+    @Test
+    void testAlterTableAdd() {
+        // add single column
+        sql("alter table t1 add new_column string comment 'new_column docs'")
+                .ok("ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT 
'new_column docs'");
+        sql("alter table t1 add new_column string comment 'new_column docs' 
first")
+                .ok("ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT 
'new_column docs' FIRST");
+        sql("alter table t1 add new_column string comment 'new_column docs' 
after id")
+                .ok(
+                        "ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT 
'new_column docs' AFTER `ID`");
+        // add compute column
+        sql("alter table t1 add col_int as col_a - col_b after col_b")
+                .ok("ALTER TABLE `T1` ADD `COL_INT` AS (`COL_A` - `COL_B`) 
AFTER `COL_B`");
+        // add metadata column
+        sql("alter table t1 add col_int int metadata from 'mk1' virtual 
comment 'comment_metadata' after col_b")
+                .ok(
+                        "ALTER TABLE `T1` ADD `COL_INT` INTEGER METADATA FROM 
'mk1' VIRTUAL "
+                                + "COMMENT 'comment_metadata' AFTER `COL_B`");
+
+        // add watermark

Review Comment:
   Ok. I just check whether it will throw the clear exception to notify the 
wrong usage. But I think add a case like `testCreateTableWithMultipleWatermark` 
would be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to