fsk119 commented on code in PR #19193:
URL: https://github.com/apache/flink/pull/19193#discussion_r902248632
##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -313,6 +313,175 @@ void testAlterTable() {
sql(sql3).ok(expected3);
}
+ @Test
+ void testAlterTableAdd() {
+ // add single column
Review Comment:
If you need use comment to distinguish the difference, you'd better use
another test to test each part. For example, you can add a test
```
void testAlterTableAddSingleColumn() {
...
}
```
##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER TABLE table_name MODIFY
column/constraint/watermark clause.
+ *
+ * <p>Example: DDL like the below for modify column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- add single column
+ * ALTER TABLE mytable MODIFY new_column STRING COMMENT 'new_column docs';
+ *
+ * -- add multiple columns, constraint, and watermark
+ * ALTER TABLE mytable MODIFY (
+ * log_ts STRING COMMENT 'log timestamp string' FIRST,
+ * ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
+ * PRIMARY KEY (id) NOT ENFORCED,
+ * WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
+ * );
+ * }</pre>
+ */
+public class SqlAlterTableModify extends SqlAlterTable {
+
+ // Whether the modified column is in a paren, currently it is only used
for SQL unparse.
+ private final boolean withParen;
+ private final SqlNodeList modifiedColumns;
+ @Nullable private final SqlWatermark watermark;
+ private final List<SqlTableConstraint> constraint;
+
+ public SqlAlterTableModify(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ boolean withParen,
Review Comment:
ditto
##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER TABLE table_name MODIFY
column/constraint/watermark clause.
+ *
+ * <p>Example: DDL like the below for modify column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- add single column
+ * ALTER TABLE mytable MODIFY new_column STRING COMMENT 'new_column docs';
+ *
+ * -- add multiple columns, constraint, and watermark
+ * ALTER TABLE mytable MODIFY (
+ * log_ts STRING COMMENT 'log timestamp string' FIRST,
+ * ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
+ * PRIMARY KEY (id) NOT ENFORCED,
+ * WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
+ * );
+ * }</pre>
+ */
+public class SqlAlterTableModify extends SqlAlterTable {
+
+ // Whether the modified column is in a paren, currently it is only used
for SQL unparse.
+ private final boolean withParen;
+ private final SqlNodeList modifiedColumns;
+ @Nullable private final SqlWatermark watermark;
+ private final List<SqlTableConstraint> constraint;
+
+ public SqlAlterTableModify(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ boolean withParen,
+ SqlNodeList modifiedColumns,
+ @Nullable SqlWatermark watermark,
+ List<SqlTableConstraint> constraint) {
+ super(pos, tableName, null);
+ this.withParen = withParen;
+ this.modifiedColumns = modifiedColumns;
+ this.watermark = watermark;
+ this.constraint = constraint;
+ }
+
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(
+ getTableName(),
+ modifiedColumns,
+ watermark,
+ new SqlNodeList(constraint, SqlParserPos.ZERO));
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("MODIFY");
+ // Distinguish whether the modified column/watermark/constraint is in
a paren
+ if (withParen) {
+ SqlWriter.Frame frame =
+ writer.startList(SqlWriter.FrameTypeEnum.create("sds"),
"(", ")");
+ for (SqlNode column : modifiedColumns.getList()) {
+ printIndent(writer);
+ column.unparse(writer, leftPrec, rightPrec);
+ }
+ for (SqlTableConstraint constraint : constraint) {
+ printIndent(writer);
+ constraint.unparse(writer, leftPrec, rightPrec);
+ }
+
+ if (watermark != null) {
+ printIndent(writer);
+ watermark.unparse(writer, leftPrec, rightPrec);
+ }
+
+ writer.newlineAndIndent();
+ writer.endList(frame);
+ } else {
+ if (modifiedColumns.getList().size() == 1) {
+ // modify single column case
+ modifiedColumns.getList().get(0).unparse(writer, leftPrec,
rightPrec);
+ } else if (watermark != null) {
+ // modify watermark case
+ watermark.unparse(writer, leftPrec, rightPrec);
+ } else if (constraint.size() == 1) {
+ constraint.get(0).unparse(writer, leftPrec, rightPrec);
Review Comment:
nit: I think we should keep the print order. In the line 91-109, it prints
column, constraints and watermark. But here it print column, watermark and
constraints in order. We can align with the order in CREATE TABLE.
##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlNode to describe ALTER TABLE table_name ADD column/constraint/watermark
clause.
+ *
+ * <p>Example: DDL like the below for add column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- add single column
+ * ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';
+ *
+ * -- add multiple columns, constraint, and watermark
+ * ALTER TABLE mytable ADD (
+ * log_ts STRING COMMENT 'log timestamp string' FIRST,
+ * ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
+ * PRIMARY KEY (id) NOT ENFORCED,
+ * WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
+ * );
+ * }</pre>
+ */
+public class SqlAlterTableAdd extends SqlAlterTable {
+
+ // Whether the added column is in a paren, currently it is only used for
SQL unparse.
+ private final boolean withParen;
+ private final SqlNodeList addedColumns;
+ @Nullable private final SqlWatermark watermark;
+ private final List<SqlTableConstraint> constraint;
+
+ public SqlAlterTableAdd(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ boolean withParen,
+ SqlNodeList addedColumns,
+ @Nullable SqlWatermark sqlWatermark,
+ List<SqlTableConstraint> constraint) {
+ super(pos, tableName, null);
+ this.withParen = withParen;
+ this.addedColumns = addedColumns;
+ this.watermark = sqlWatermark;
+ this.constraint = constraint;
+ }
+
+ public SqlNodeList getColumns() {
+ return addedColumns;
+ }
+
+ public Optional<SqlWatermark> getWatermark() {
+ return Optional.ofNullable(watermark);
+ }
+
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(
+ getTableName(),
+ addedColumns,
+ watermark,
+ new SqlNodeList(constraint, SqlParserPos.ZERO));
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("ADD");
+ // Distinguish whether the added column/watermark/constraint is in a
paren
+ if (withParen) {
+ SqlWriter.Frame frame =
+ writer.startList(SqlWriter.FrameTypeEnum.create("sds"),
"(", ")");
+ for (SqlNode column : addedColumns.getList()) {
+ printIndent(writer);
+ column.unparse(writer, leftPrec, rightPrec);
+ }
+ for (SqlTableConstraint constraint : constraint) {
+ printIndent(writer);
+ constraint.unparse(writer, leftPrec, rightPrec);
+ }
+ if (watermark != null) {
+ printIndent(writer);
+ watermark.unparse(writer, leftPrec, rightPrec);
+ }
+
+ writer.newlineAndIndent();
+ writer.endList(frame);
+ } else {
+ if (addedColumns.getList().size() == 1) {
+ // add single column case
+ addedColumns.getList().get(0).unparse(writer, leftPrec,
rightPrec);
+ } else if (watermark != null) {
+ // add watermark case
+ watermark.unparse(writer, leftPrec, rightPrec);
+ }
Review Comment:
no constraint?
##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlNode to describe ALTER TABLE table_name ADD column/constraint/watermark
clause.
+ *
+ * <p>Example: DDL like the below for add column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- add single column
+ * ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';
+ *
+ * -- add multiple columns, constraint, and watermark
+ * ALTER TABLE mytable ADD (
Review Comment:
Would be better add metadata column example here.
##########
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##########
@@ -567,12 +568,60 @@ SqlAlterTable SqlAlterTable() :
propertyList);
}
|
- <ADD> constraint = TableConstraint() {
- return new SqlAlterTableAddConstraint(
+ <ADD>
+ (
+ AlterTableAddOrModify(ctx) {
+ // TODO: remove it after supports convert SqlNode to Operation
+ if (ctx.constraints.size() > 0) {
+ return new SqlAlterTableAddConstraint(
+ tableIdentifier,
+ ctx.constraints.get(0),
+ startPos.plus(getPos()));
+ } else {
+ withParen = false;
+ }
+ }
+ |
+ <LPAREN>
+ AlterTableAddOrModify(ctx)
+ (
+ <COMMA> AlterTableAddOrModify(ctx)
+ )*
+ <RPAREN>
+ )
+ {
+ return new SqlAlterTableAdd(
+ startPos.plus(getPos()),
+ tableIdentifier,
+ withParen,
Review Comment:
I think we don't need `withParen` here. It just influences the `unparse`
behaviour. We can always add brackets here.
`withParen` should be true when add multiple columns? If so, I think we can
just infer from the column list. If the added object is more than one, we
should use `(` and `)` here.
##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER TABLE table_name MODIFY
column/constraint/watermark clause.
+ *
+ * <p>Example: DDL like the below for modify column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- add single column
+ * ALTER TABLE mytable MODIFY new_column STRING COMMENT 'new_column docs';
+ *
+ * -- add multiple columns, constraint, and watermark
Review Comment:
modify ?
##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/position/SqlColumnPosSpec.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.position;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/** Enumeration of SQL column position specification. */
+public enum SqlColumnPosSpec {
+ FIRST("FIRST"),
+ AFTER("AFTER"),
+ LAST("LAST");
+
+ private final String digest;
Review Comment:
We don't need digest. The default toString result is equal to the literal
name.
##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -313,6 +313,175 @@ void testAlterTable() {
sql(sql3).ok(expected3);
}
+ @Test
+ void testAlterTableAdd() {
+ // add single column
+ sql("alter table t1 add new_column string comment 'new_column docs'")
+ .ok("ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT
'new_column docs'");
+ sql("alter table t1 add new_column string comment 'new_column docs'
first")
+ .ok("ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT
'new_column docs' FIRST");
+ sql("alter table t1 add new_column string comment 'new_column docs'
after id")
+ .ok(
+ "ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT
'new_column docs' AFTER `ID`");
+ // add compute column
+ sql("alter table t1 add col_int as col_a - col_b after col_b")
+ .ok("ALTER TABLE `T1` ADD `COL_INT` AS (`COL_A` - `COL_B`)
AFTER `COL_B`");
+ // add metadata column
+ sql("alter table t1 add col_int int metadata from 'mk1' virtual
comment 'comment_metadata' after col_b")
+ .ok(
+ "ALTER TABLE `T1` ADD `COL_INT` INTEGER METADATA FROM
'mk1' VIRTUAL "
+ + "COMMENT 'comment_metadata' AFTER `COL_B`");
+
+ // add watermark
+ sql("alter table t1 add watermark for ts as ts - interval '1' second")
+ .ok("ALTER TABLE `T1` ADD WATERMARK FOR `TS` AS (`TS` -
INTERVAL '1' SECOND)");
+ sql("alter table default_database.t1 add watermark for ts as ts -
interval '1' second")
+ .ok(
+ "ALTER TABLE `DEFAULT_DATABASE`.`T1` ADD WATERMARK FOR
`TS` AS (`TS` - INTERVAL '1' SECOND)");
+ sql("alter table default_catalog.default_database.t1 add watermark for
ts as ts - interval '1' second")
+ .ok(
+ "ALTER TABLE `DEFAULT_CATALOG`.`DEFAULT_DATABASE`.`T1`
"
+ + "ADD WATERMARK FOR `TS` AS (`TS` - INTERVAL
'1' SECOND)");
+
Review Comment:
add case for add single constraint here.
##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/position/SqlTableColumnPosition.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.position;
+
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** SqlNode to describe table column and its position. */
+public class SqlTableColumnPosition extends SqlCall {
+
+ private static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("SqlTableColumnPosition", SqlKind.OTHER);
+
+ private final SqlTableColumn column;
+ private final SqlLiteral positionSpec;
+ @Nullable private final SqlIdentifier referencedColumn;
+
+ public SqlTableColumnPosition(
+ SqlParserPos pos,
+ SqlTableColumn column,
+ SqlLiteral positionSpec,
+ @Nullable SqlIdentifier referencedColumn) {
+ super(pos);
+ this.column = column;
+ this.positionSpec = positionSpec;
+ this.referencedColumn = referencedColumn;
+ }
+
+ public boolean isFirstColumn() {
+ return positionSpec.getValueAs(SqlColumnPosSpec.class) ==
SqlColumnPosSpec.FIRST;
+ }
+
+ public boolean isReferencedColumn() {
Review Comment:
isAfterReferencesColumn?
##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -313,6 +313,175 @@ void testAlterTable() {
sql(sql3).ok(expected3);
}
+ @Test
+ void testAlterTableAdd() {
+ // add single column
+ sql("alter table t1 add new_column string comment 'new_column docs'")
+ .ok("ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT
'new_column docs'");
+ sql("alter table t1 add new_column string comment 'new_column docs'
first")
+ .ok("ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT
'new_column docs' FIRST");
+ sql("alter table t1 add new_column string comment 'new_column docs'
after id")
+ .ok(
+ "ALTER TABLE `T1` ADD `NEW_COLUMN` STRING COMMENT
'new_column docs' AFTER `ID`");
+ // add compute column
+ sql("alter table t1 add col_int as col_a - col_b after col_b")
+ .ok("ALTER TABLE `T1` ADD `COL_INT` AS (`COL_A` - `COL_B`)
AFTER `COL_B`");
+ // add metadata column
+ sql("alter table t1 add col_int int metadata from 'mk1' virtual
comment 'comment_metadata' after col_b")
+ .ok(
+ "ALTER TABLE `T1` ADD `COL_INT` INTEGER METADATA FROM
'mk1' VIRTUAL "
+ + "COMMENT 'comment_metadata' AFTER `COL_B`");
+
+ // add watermark
Review Comment:
What's the behavirour user add multiple watermark here?
##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java:
##########
@@ -94,4 +97,15 @@ public SqlNodeList getPartitionSpec() {
public LinkedHashMap<String, String> getPartitionKVs() {
return SqlPartitionUtils.getPartitionKVs(getPartitionSpec());
}
+
Review Comment:
I find all SqlAlterTableXXX extends SqlAlterTable that contains a field
named partitionSpec. But not all subclass requrire this field. I think we'd
better move this field to the subclass that need this?
##########
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##########
@@ -567,12 +568,60 @@ SqlAlterTable SqlAlterTable() :
propertyList);
}
|
- <ADD> constraint = TableConstraint() {
- return new SqlAlterTableAddConstraint(
+ <ADD>
+ (
+ AlterTableAddOrModify(ctx) {
+ // TODO: remove it after supports convert SqlNode to Operation
Review Comment:
It's better you can link to a jira issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]