This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 77b9107221bafb0174f69a2b6add153269ab6274
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Dec 1 19:24:08 2025 +0100

    [FLINK-38756][table] Add support for ALTER MATERIALIZED TABLE ADD columns 
in parser
---
 .../src/main/codegen/data/Parser.tdd               |   1 +
 .../src/main/codegen/includes/parserImpls.ftl      |  40 ++++--
 .../ddl/SqlAlterMaterializedTableSchema.java       | 141 +++++++++++++++++++++
 .../flink/sql/parser/ddl/SqlAlterTableAdd.java     |   2 +-
 .../flink/sql/parser/ddl/SqlAlterTableSchema.java  |   2 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  62 ---------
 .../MaterializedTableStatementParserTest.java      | 120 ++++++++++++++++++
 .../apache/flink/sql/parser/ValidationMatcher.java |  88 +++++++++++++
 .../AbstractAlterMaterializedTableConverter.java   |   4 +-
 9 files changed, 383 insertions(+), 77 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 42acdb21f70..49ffde6d757 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -53,6 +53,7 @@
     "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh"
     "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume"
     "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend"
+    
"org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableAddSchema"
     "org.apache.flink.sql.parser.ddl.SqlAlterModel"
     "org.apache.flink.sql.parser.ddl.SqlAlterModelRename"
     "org.apache.flink.sql.parser.ddl.SqlAlterModelReset"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d1f6d2eba50..14bcdfd6f19 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2102,24 +2102,44 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
                     asQuery);
             }
         |
-        <MODIFY> <DISTRIBUTION> {
-                return new SqlAlterMaterializedTableModifyDistribution(
+        <ADD>
+            (
+            <DISTRIBUTION> {
+                return new SqlAlterMaterializedTableAddDistribution(
                 startPos.plus(getPos()),
-                tableIdentifier,
-                SqlDistribution(getPos()));
+                tableIdentifier, SqlDistribution(getPos()));
             }
+        |
+            AlterTableAddOrModify(ctx)
+        |
+            <LPAREN>
+            AlterTableAddOrModify(ctx)
+            (
+                <COMMA> AlterTableAddOrModify(ctx)
+            )*
+            <RPAREN>
+        )
+        {
+            return new SqlAlterMaterializedTableAddSchema(
+              startPos.plus(getPos()),
+              tableIdentifier,
+              new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())),
+              ctx.constraints,
+              ctx.watermark);
+        }
+        |
+        <MODIFY> <DISTRIBUTION> {
+            return new SqlAlterMaterializedTableModifyDistribution(
+              startPos.plus(getPos()),
+              tableIdentifier,
+              SqlDistribution(getPos()));
+        }
         |
         <DROP> <DISTRIBUTION> {
                 return new SqlAlterMaterializedTableDropDistribution(
                 startPos.plus(getPos()),
                 tableIdentifier);
             }
-        |
-        <ADD> <DISTRIBUTION> {
-                return new SqlAlterMaterializedTableAddDistribution(
-                startPos.plus(getPos()),
-                tableIdentifier, SqlDistribution(getPos()));
-            }
     )
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSchema.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSchema.java
new file mode 100644
index 00000000000..a1c51400e7f
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSchema.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract class to describe statements which are used to alter schema for 
materialized tables. See
+ * examples in javadoc for {@link SqlAlterMaterializedTableAddSchema}.
+ */
+public abstract class SqlAlterMaterializedTableSchema extends 
SqlAlterMaterializedTable
+        implements ExtendedSqlNode {
+
+    protected final SqlNodeList columnList;
+    protected final List<SqlTableConstraint> constraints;
+    protected final @Nullable SqlWatermark watermark;
+
+    public SqlAlterMaterializedTableSchema(
+            SqlParserPos pos,
+            SqlIdentifier materializedTableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> constraints,
+            @Nullable SqlWatermark sqlWatermark) {
+        super(pos, materializedTableName);
+        this.columnList = columnList;
+        this.constraints = constraints;
+        this.watermark = sqlWatermark;
+    }
+
+    @Nonnull
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(
+                name, columnList, new SqlNodeList(constraints, 
SqlParserPos.ZERO), watermark);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        SqlConstraintValidator.validateAndChangeColumnNullability(constraints, 
getColumns());
+    }
+
+    public SqlNodeList getColumnPositions() {
+        return columnList;
+    }
+
+    public Optional<SqlWatermark> getWatermark() {
+        return Optional.ofNullable(watermark);
+    }
+
+    public List<SqlTableConstraint> getConstraints() {
+        return constraints;
+    }
+
+    private SqlNodeList getColumns() {
+        return new SqlNodeList(
+                columnList.getList().stream()
+                        .map(columnPos -> ((SqlTableColumnPosition) 
columnPos).getColumn())
+                        .collect(Collectors.toList()),
+                SqlParserPos.ZERO);
+    }
+
+    protected abstract String getAlterOperation();
+
+    @Override
+    public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
+        super.unparseAlterOperation(writer, leftPrec, rightPrec);
+        writer.keyword(getAlterOperation());
+        SqlUnparseUtils.unparseTableSchema(
+                columnList, constraints, watermark, writer, leftPrec, 
rightPrec);
+    }
+
+    /**
+     * Example: DDL like the below for adding column(s)/constraint/watermark.
+     *
+     * <p>Note: adding or altering physical columns is not supported, only 
computed or metadata
+     *
+     * <pre>{@code
+     * -- add single column
+     * ALTER MATERIALIZED TABLE myMaterializedTable ADD c1 AS 
current_timestamp COMMENT 'new_column docs';
+     *
+     * -- add multiple columns, constraint, and watermark
+     * ALTER MATERIALIZED TABLE myMaterializedTable ADD (
+     *     ts AS current_timestamp FIRST,
+     *     col_meta INT METADATA FROM 'mk1' VIRTUAL AFTER col_b,
+     *     PRIMARY KEY (id) NOT ENFORCED,
+     *     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
+     * );
+     *
+     * }</pre>
+     */
+    public static class SqlAlterMaterializedTableAddSchema extends 
SqlAlterMaterializedTableSchema {
+        public SqlAlterMaterializedTableAddSchema(
+                SqlParserPos pos,
+                SqlIdentifier materializedTableName,
+                SqlNodeList columnList,
+                List<SqlTableConstraint> constraints,
+                @Nullable SqlWatermark sqlWatermark) {
+            super(pos, materializedTableName, columnList, constraints, 
sqlWatermark);
+        }
+
+        @Override
+        protected String getAlterOperation() {
+            return "ADD";
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
index 96d7676b41c..ed2a8171887 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
@@ -31,7 +31,7 @@ import java.util.List;
 /**
  * SqlNode to describe ALTER TABLE [IF EXISTS] table_name ADD 
column/constraint/watermark clause.
  *
- * <p>Example: DDL like the below for add column/constraint/watermark.
+ * <p>Example: DDL like the below for adding column(s)/constraint/watermark.
  *
  * <pre>{@code
  * -- add single column
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
index 761b64a3a76..8649f6f74de 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
@@ -43,8 +43,8 @@ import java.util.stream.Collectors;
 public abstract class SqlAlterTableSchema extends SqlAlterTable implements 
ExtendedSqlNode {
 
     protected final SqlNodeList columnList;
-    @Nullable protected final SqlWatermark watermark;
     protected final List<SqlTableConstraint> constraints;
+    protected final @Nullable SqlWatermark watermark;
 
     public SqlAlterTableSchema(
             SqlParserPos pos,
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index a5dbf34d45b..8a6b3abe8a6 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.sql.parser;
 
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.error.SqlValidateException;
 import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
 
@@ -38,9 +37,7 @@ import org.junit.jupiter.params.provider.CsvSource;
 
 import java.util.Locale;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.fail;
 import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
 
 /** FlinkSqlParserImpl tests. * */
@@ -3564,63 +3561,4 @@ class FlinkSqlParserImplTest extends SqlParserTest {
         sql("CREATE TABLE t (\n" + "v VARIANT NOT NULL" + "\n)")
                 .ok("CREATE TABLE `T` (\n" + "  `V` VARIANT NOT NULL\n" + ")");
     }
-
-    /** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} 
instance. * */
-    private static class ValidationMatcher extends BaseMatcher<SqlNode> {
-        private String expectedColumnSql;
-        private String failMsg;
-        private boolean ok;
-
-        public ValidationMatcher expectColumnSql(String s) {
-            this.expectedColumnSql = s;
-            return this;
-        }
-
-        public ValidationMatcher fails(String failMsg) {
-            this.failMsg = failMsg;
-            this.ok = false;
-            return this;
-        }
-
-        public ValidationMatcher ok() {
-            this.failMsg = null;
-            this.ok = true;
-            return this;
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description.appendText("test");
-        }
-
-        @Override
-        public boolean matches(Object item) {
-            if (item instanceof ExtendedSqlNode) {
-                ExtendedSqlNode createTable = (ExtendedSqlNode) item;
-
-                if (ok) {
-                    try {
-                        createTable.validate();
-                    } catch (SqlValidateException e) {
-                        fail("unexpected exception", e);
-                    }
-                } else if (failMsg != null) {
-                    try {
-                        createTable.validate();
-                        fail("expected exception");
-                    } catch (SqlValidateException e) {
-                        assertThat(e).hasMessage(failMsg);
-                    }
-                }
-
-                if (expectedColumnSql != null && item instanceof 
SqlCreateTable) {
-                    assertThat(((SqlCreateTable) 
createTable).getColumnSqlString())
-                            .isEqualTo(expectedColumnSql);
-                }
-                return true;
-            } else {
-                return false;
-            }
-        }
-    }
 }
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
index f62935ee80c..ce7f15d59f7 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -350,6 +350,126 @@ class MaterializedTableStatementParserTest {
                                 + "    ");
     }
 
+    @Test
+    void testAlterMaterializedTableAddSchema() {
+        sql("alter materialized table mt1 add constraint ct1 primary key(a, 
b)")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`)\n"
+                                + ")")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "Flink doesn't support ENFORCED mode 
for PRIMARY KEY constraint. "
+                                                + "ENFORCED/NOT ENFORCED 
controls if the constraint checks are performed on the incoming/outgoing data. "
+                                                + "Flink does not own the data 
therefore the only supported mode is the NOT ENFORCED mode"));
+        sql("alter materialized table mt1 add constraint ct1 primary key(a, b) 
not enforced")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) 
NOT ENFORCED\n"
+                                + ")");
+        sql("alter materialized table mt1 " + "add unique(a, b)")
+                .ok("ALTER MATERIALIZED TABLE `MT1` ADD (\n" + "  UNIQUE (`A`, 
`B`)\n" + ")")
+                .node(new ValidationMatcher().fails("UNIQUE constraint is not 
supported yet"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAddNestedColumn() {
+        // add a row column
+        sql("alter materialized table mt1 add new_column array<row(f0 int, f1 
bigint)> comment 'new_column docs'")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `NEW_COLUMN` ARRAY< ROW(`F0` INTEGER, 
`F1` BIGINT) > COMMENT 'new_column docs'\n"
+                                + ")");
+
+        sql("alter materialized table mt1 add (new_row row(f0 int, f1 bigint) 
comment 'new_column docs', f2 as new_row.f0 + 1)")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `NEW_ROW` ROW(`F0` INTEGER, `F1` BIGINT) 
COMMENT 'new_column docs',\n"
+                                + "  `F2` AS (`NEW_ROW`.`F0` + 1)\n"
+                                + ")");
+
+        // add a field to the row
+        sql("alter materialized table mt1 add (new_row.f2 array<int>)")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `NEW_ROW`.`F2` ARRAY< INTEGER >\n"
+                                + ")");
+
+        // add a field to the row with after
+        sql("alter materialized table mt1 add (new_row.f2 array<int> after 
new_row.f0)")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `NEW_ROW`.`F2` ARRAY< INTEGER > AFTER 
`NEW_ROW`.`F0`\n"
+                                + ")");
+    }
+
+    @Test
+    void testAlterMaterializedTableAddSingleColumn() {
+        sql("alter materialized table mt1 add new_column int not null")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `NEW_COLUMN` INTEGER NOT NULL\n"
+                                + ")");
+        sql("alter materialized table mt1 add new_column string comment 
'new_column docs'")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `NEW_COLUMN` STRING COMMENT 'new_column 
docs'\n"
+                                + ")");
+        sql("alter materialized table mt1 add new_column string comment 
'new_column docs' first")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `NEW_COLUMN` STRING COMMENT 'new_column 
docs' FIRST\n"
+                                + ")");
+        sql("alter materialized table mt1 add new_column string comment 
'new_column docs' after id")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `NEW_COLUMN` STRING COMMENT 'new_column 
docs' AFTER `ID`\n"
+                                + ")");
+        // add compute column
+        sql("alter materialized table mt1 add col_int as col_a - col_b after 
col_b")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `COL_INT` AS (`COL_A` - `COL_B`) AFTER 
`COL_B`\n"
+                                + ")");
+        // add metadata column
+        sql("alter materialized table mt1 add col_int int metadata from 'mk1' 
virtual comment 'comment_metadata' after col_b")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  `COL_INT` INTEGER METADATA FROM 'mk1' 
VIRTUAL COMMENT 'comment_metadata' AFTER `COL_B`\n"
+                                + ")");
+    }
+
+    @Test
+    void testAlterMaterializedTableAddWatermark() {
+        sql("alter materialized table mt1 add watermark for ts as ts")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  WATERMARK FOR `TS` AS `TS`\n"
+                                + ")");
+        sql("alter materialized table mt1 add watermark for ts as ts - 
interval '1' second")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` ADD (\n"
+                                + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL 
'1' SECOND)\n"
+                                + ")");
+        sql("alter materialized table default_database.mt1 add watermark for 
ts as ts - interval '1' second")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `DEFAULT_DATABASE`.`MT1` ADD 
(\n"
+                                + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL 
'1' SECOND)\n"
+                                + ")");
+        sql("alter materialized table default_catalog.default_database.mt1 add 
watermark for ts as ts - interval '1' second")
+                .ok(
+                        "ALTER MATERIALIZED TABLE 
`DEFAULT_CATALOG`.`DEFAULT_DATABASE`.`MT1` ADD (\n"
+                                + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL 
'1' SECOND)\n"
+                                + ")");
+
+        sql("alter materialized table default_catalog.default_database.mt1 add 
(\n"
+                        + "watermark for ts as ts - interval '1' second,\n"
+                        + "^watermark^ for f1 as now()\n"
+                        + ")")
+                .fails("Multiple WATERMARK statements is not supported yet.");
+    }
+
     @Test
     void testAlterMaterializedTableAddDistribution() {
         sql("alter materialized table mt1 add distribution by hash(a) into 6 
buckets")
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/ValidationMatcher.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/ValidationMatcher.java
new file mode 100644
index 00000000000..fd49840e21d
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/ValidationMatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlNode;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} 
instance. * */
+class ValidationMatcher extends BaseMatcher<SqlNode> {
+    private String expectedColumnSql;
+    private String failMsg;
+    private boolean ok;
+
+    public ValidationMatcher expectColumnSql(String s) {
+        this.expectedColumnSql = s;
+        return this;
+    }
+
+    public ValidationMatcher fails(String failMsg) {
+        this.failMsg = failMsg;
+        this.ok = false;
+        return this;
+    }
+
+    public ValidationMatcher ok() {
+        this.failMsg = null;
+        this.ok = true;
+        return this;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText("test");
+    }
+
+    @Override
+    public boolean matches(Object item) {
+        if (item instanceof ExtendedSqlNode) {
+            ExtendedSqlNode createTable = (ExtendedSqlNode) item;
+
+            if (ok) {
+                try {
+                    createTable.validate();
+                } catch (SqlValidateException e) {
+                    fail("unexpected exception", e);
+                }
+            } else if (failMsg != null) {
+                try {
+                    createTable.validate();
+                    fail("expected exception");
+                } catch (SqlValidateException e) {
+                    assertThat(e).hasMessage(failMsg);
+                }
+            }
+
+            if (expectedColumnSql != null && item instanceof SqlCreateTable) {
+                assertThat(((SqlCreateTable) createTable).getColumnSqlString())
+                        .isEqualTo(expectedColumnSql);
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java
index 43ca0badfa0..a1df41d505b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java
@@ -26,15 +26,13 @@ import 
org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 
-import org.apache.calcite.sql.SqlNode;
-
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
 
 /** Abstract converter for {@link 
org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable}. */
-public abstract class AbstractAlterMaterializedTableConverter<T extends 
SqlNode>
+public abstract class AbstractAlterMaterializedTableConverter<T extends 
SqlAlterMaterializedTable>
         implements SqlNodeConverter<T> {
 
     protected ObjectIdentifier resolveIdentifier(

Reply via email to