This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 77b9107221bafb0174f69a2b6add153269ab6274 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Mon Dec 1 19:24:08 2025 +0100 [FLINK-38756][table] Add support for ALTER MATERIALIZED TABLE ADD columns in parser --- .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 40 ++++-- .../ddl/SqlAlterMaterializedTableSchema.java | 141 +++++++++++++++++++++ .../flink/sql/parser/ddl/SqlAlterTableAdd.java | 2 +- .../flink/sql/parser/ddl/SqlAlterTableSchema.java | 2 +- .../flink/sql/parser/FlinkSqlParserImplTest.java | 62 --------- .../MaterializedTableStatementParserTest.java | 120 ++++++++++++++++++ .../apache/flink/sql/parser/ValidationMatcher.java | 88 +++++++++++++ .../AbstractAlterMaterializedTableConverter.java | 4 +- 9 files changed, 383 insertions(+), 77 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 42acdb21f70..49ffde6d757 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -53,6 +53,7 @@ "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableAddSchema" "org.apache.flink.sql.parser.ddl.SqlAlterModel" "org.apache.flink.sql.parser.ddl.SqlAlterModelRename" "org.apache.flink.sql.parser.ddl.SqlAlterModelReset" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index d1f6d2eba50..14bcdfd6f19 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -2102,24 +2102,44 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() : asQuery); } | - <MODIFY> <DISTRIBUTION> { - return new SqlAlterMaterializedTableModifyDistribution( + <ADD> + ( + <DISTRIBUTION> { + return new SqlAlterMaterializedTableAddDistribution( startPos.plus(getPos()), - tableIdentifier, - SqlDistribution(getPos())); + tableIdentifier, SqlDistribution(getPos())); } + | + AlterTableAddOrModify(ctx) + | + <LPAREN> + AlterTableAddOrModify(ctx) + ( + <COMMA> AlterTableAddOrModify(ctx) + )* + <RPAREN> + ) + { + return new SqlAlterMaterializedTableAddSchema( + startPos.plus(getPos()), + tableIdentifier, + new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())), + ctx.constraints, + ctx.watermark); + } + | + <MODIFY> <DISTRIBUTION> { + return new SqlAlterMaterializedTableModifyDistribution( + startPos.plus(getPos()), + tableIdentifier, + SqlDistribution(getPos())); + } | <DROP> <DISTRIBUTION> { return new SqlAlterMaterializedTableDropDistribution( startPos.plus(getPos()), tableIdentifier); } - | - <ADD> <DISTRIBUTION> { - return new SqlAlterMaterializedTableAddDistribution( - startPos.plus(getPos()), - tableIdentifier, SqlDistribution(getPos())); - } ) } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSchema.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSchema.java new file mode 100644 index 00000000000..a1c51400e7f --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSchema.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.SqlConstraintValidator; +import org.apache.flink.sql.parser.SqlUnparseUtils; +import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; +import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition; +import org.apache.flink.sql.parser.error.SqlValidateException; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Abstract class to describe statements which are used to alter schema for materialized tables. See + * examples in javadoc for {@link SqlAlterMaterializedTableAddSchema}. + */ +public abstract class SqlAlterMaterializedTableSchema extends SqlAlterMaterializedTable + implements ExtendedSqlNode { + + protected final SqlNodeList columnList; + protected final List<SqlTableConstraint> constraints; + protected final @Nullable SqlWatermark watermark; + + public SqlAlterMaterializedTableSchema( + SqlParserPos pos, + SqlIdentifier materializedTableName, + SqlNodeList columnList, + List<SqlTableConstraint> constraints, + @Nullable SqlWatermark sqlWatermark) { + super(pos, materializedTableName); + this.columnList = columnList; + this.constraints = constraints; + this.watermark = sqlWatermark; + } + + @Nonnull + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of( + name, columnList, new SqlNodeList(constraints, SqlParserPos.ZERO), watermark); + } + + @Override + public void validate() throws SqlValidateException { + SqlConstraintValidator.validateAndChangeColumnNullability(constraints, getColumns()); + } + + public SqlNodeList getColumnPositions() { + return columnList; + } + + public Optional<SqlWatermark> getWatermark() { + return Optional.ofNullable(watermark); + } + + public List<SqlTableConstraint> getConstraints() { + return constraints; + } + + private SqlNodeList getColumns() { + return new SqlNodeList( + columnList.getList().stream() + .map(columnPos -> ((SqlTableColumnPosition) columnPos).getColumn()) + .collect(Collectors.toList()), + SqlParserPos.ZERO); + } + + protected abstract String getAlterOperation(); + + @Override + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); + writer.keyword(getAlterOperation()); + SqlUnparseUtils.unparseTableSchema( + columnList, constraints, watermark, writer, leftPrec, rightPrec); + } + + /** + * Example: DDL like the below for adding column(s)/constraint/watermark. + * + * <p>Note: adding or altering physical columns is not supported, only computed or metadata + * + * <pre>{@code + * -- add single column + * ALTER MATERIALIZED TABLE myMaterializedTable ADD c1 AS current_timestamp COMMENT 'new_column docs'; + * + * -- add multiple columns, constraint, and watermark + * ALTER MATERIALIZED TABLE myMaterializedTable ADD ( + * ts AS current_timestamp FIRST, + * col_meta INT METADATA FROM 'mk1' VIRTUAL AFTER col_b, + * PRIMARY KEY (id) NOT ENFORCED, + * WATERMARK FOR ts AS ts - INTERVAL '3' SECOND + * ); + * + * }</pre> + */ + public static class SqlAlterMaterializedTableAddSchema extends SqlAlterMaterializedTableSchema { + public SqlAlterMaterializedTableAddSchema( + SqlParserPos pos, + SqlIdentifier materializedTableName, + SqlNodeList columnList, + List<SqlTableConstraint> constraints, + @Nullable SqlWatermark sqlWatermark) { + super(pos, materializedTableName, columnList, constraints, sqlWatermark); + } + + @Override + protected String getAlterOperation() { + return "ADD"; + } + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java index 96d7676b41c..ed2a8171887 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java @@ -31,7 +31,7 @@ import java.util.List; /** * SqlNode to describe ALTER TABLE [IF EXISTS] table_name ADD column/constraint/watermark clause. * - * <p>Example: DDL like the below for add column/constraint/watermark. + * <p>Example: DDL like the below for adding column(s)/constraint/watermark. * * <pre>{@code * -- add single column diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java index 761b64a3a76..8649f6f74de 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java @@ -43,8 +43,8 @@ import java.util.stream.Collectors; public abstract class SqlAlterTableSchema extends SqlAlterTable implements ExtendedSqlNode { protected final SqlNodeList columnList; - @Nullable protected final SqlWatermark watermark; protected final List<SqlTableConstraint> constraints; + protected final @Nullable SqlWatermark watermark; public SqlAlterTableSchema( SqlParserPos pos, diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a5dbf34d45b..8a6b3abe8a6 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -18,7 +18,6 @@ package org.apache.flink.sql.parser; -import org.apache.flink.sql.parser.ddl.SqlCreateTable; import org.apache.flink.sql.parser.error.SqlValidateException; import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; @@ -38,9 +37,7 @@ import org.junit.jupiter.params.provider.CsvSource; import java.util.Locale; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assertions.fail; import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; /** FlinkSqlParserImpl tests. * */ @@ -3564,63 +3561,4 @@ class FlinkSqlParserImplTest extends SqlParserTest { sql("CREATE TABLE t (\n" + "v VARIANT NOT NULL" + "\n)") .ok("CREATE TABLE `T` (\n" + " `V` VARIANT NOT NULL\n" + ")"); } - - /** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} instance. * */ - private static class ValidationMatcher extends BaseMatcher<SqlNode> { - private String expectedColumnSql; - private String failMsg; - private boolean ok; - - public ValidationMatcher expectColumnSql(String s) { - this.expectedColumnSql = s; - return this; - } - - public ValidationMatcher fails(String failMsg) { - this.failMsg = failMsg; - this.ok = false; - return this; - } - - public ValidationMatcher ok() { - this.failMsg = null; - this.ok = true; - return this; - } - - @Override - public void describeTo(Description description) { - description.appendText("test"); - } - - @Override - public boolean matches(Object item) { - if (item instanceof ExtendedSqlNode) { - ExtendedSqlNode createTable = (ExtendedSqlNode) item; - - if (ok) { - try { - createTable.validate(); - } catch (SqlValidateException e) { - fail("unexpected exception", e); - } - } else if (failMsg != null) { - try { - createTable.validate(); - fail("expected exception"); - } catch (SqlValidateException e) { - assertThat(e).hasMessage(failMsg); - } - } - - if (expectedColumnSql != null && item instanceof SqlCreateTable) { - assertThat(((SqlCreateTable) createTable).getColumnSqlString()) - .isEqualTo(expectedColumnSql); - } - return true; - } else { - return false; - } - } - } } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java index f62935ee80c..ce7f15d59f7 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java @@ -350,6 +350,126 @@ class MaterializedTableStatementParserTest { + " "); } + @Test + void testAlterMaterializedTableAddSchema() { + sql("alter materialized table mt1 add constraint ct1 primary key(a, b)") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`)\n" + + ")") + .node( + new ValidationMatcher() + .fails( + "Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. " + + "ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. " + + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode")); + sql("alter materialized table mt1 add constraint ct1 primary key(a, b) not enforced") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n" + + ")"); + sql("alter materialized table mt1 " + "add unique(a, b)") + .ok("ALTER MATERIALIZED TABLE `MT1` ADD (\n" + " UNIQUE (`A`, `B`)\n" + ")") + .node(new ValidationMatcher().fails("UNIQUE constraint is not supported yet")); + } + + @Test + void testAlterMaterializedTableAddNestedColumn() { + // add a row column + sql("alter materialized table mt1 add new_column array<row(f0 int, f1 bigint)> comment 'new_column docs'") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `NEW_COLUMN` ARRAY< ROW(`F0` INTEGER, `F1` BIGINT) > COMMENT 'new_column docs'\n" + + ")"); + + sql("alter materialized table mt1 add (new_row row(f0 int, f1 bigint) comment 'new_column docs', f2 as new_row.f0 + 1)") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `NEW_ROW` ROW(`F0` INTEGER, `F1` BIGINT) COMMENT 'new_column docs',\n" + + " `F2` AS (`NEW_ROW`.`F0` + 1)\n" + + ")"); + + // add a field to the row + sql("alter materialized table mt1 add (new_row.f2 array<int>)") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `NEW_ROW`.`F2` ARRAY< INTEGER >\n" + + ")"); + + // add a field to the row with after + sql("alter materialized table mt1 add (new_row.f2 array<int> after new_row.f0)") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `NEW_ROW`.`F2` ARRAY< INTEGER > AFTER `NEW_ROW`.`F0`\n" + + ")"); + } + + @Test + void testAlterMaterializedTableAddSingleColumn() { + sql("alter materialized table mt1 add new_column int not null") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `NEW_COLUMN` INTEGER NOT NULL\n" + + ")"); + sql("alter materialized table mt1 add new_column string comment 'new_column docs'") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `NEW_COLUMN` STRING COMMENT 'new_column docs'\n" + + ")"); + sql("alter materialized table mt1 add new_column string comment 'new_column docs' first") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `NEW_COLUMN` STRING COMMENT 'new_column docs' FIRST\n" + + ")"); + sql("alter materialized table mt1 add new_column string comment 'new_column docs' after id") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `NEW_COLUMN` STRING COMMENT 'new_column docs' AFTER `ID`\n" + + ")"); + // add compute column + sql("alter materialized table mt1 add col_int as col_a - col_b after col_b") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `COL_INT` AS (`COL_A` - `COL_B`) AFTER `COL_B`\n" + + ")"); + // add metadata column + sql("alter materialized table mt1 add col_int int metadata from 'mk1' virtual comment 'comment_metadata' after col_b") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " `COL_INT` INTEGER METADATA FROM 'mk1' VIRTUAL COMMENT 'comment_metadata' AFTER `COL_B`\n" + + ")"); + } + + @Test + void testAlterMaterializedTableAddWatermark() { + sql("alter materialized table mt1 add watermark for ts as ts") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " WATERMARK FOR `TS` AS `TS`\n" + + ")"); + sql("alter materialized table mt1 add watermark for ts as ts - interval '1' second") + .ok( + "ALTER MATERIALIZED TABLE `MT1` ADD (\n" + + " WATERMARK FOR `TS` AS (`TS` - INTERVAL '1' SECOND)\n" + + ")"); + sql("alter materialized table default_database.mt1 add watermark for ts as ts - interval '1' second") + .ok( + "ALTER MATERIALIZED TABLE `DEFAULT_DATABASE`.`MT1` ADD (\n" + + " WATERMARK FOR `TS` AS (`TS` - INTERVAL '1' SECOND)\n" + + ")"); + sql("alter materialized table default_catalog.default_database.mt1 add watermark for ts as ts - interval '1' second") + .ok( + "ALTER MATERIALIZED TABLE `DEFAULT_CATALOG`.`DEFAULT_DATABASE`.`MT1` ADD (\n" + + " WATERMARK FOR `TS` AS (`TS` - INTERVAL '1' SECOND)\n" + + ")"); + + sql("alter materialized table default_catalog.default_database.mt1 add (\n" + + "watermark for ts as ts - interval '1' second,\n" + + "^watermark^ for f1 as now()\n" + + ")") + .fails("Multiple WATERMARK statements is not supported yet."); + } + @Test void testAlterMaterializedTableAddDistribution() { sql("alter materialized table mt1 add distribution by hash(a) into 6 buckets") diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/ValidationMatcher.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/ValidationMatcher.java new file mode 100644 index 00000000000..fd49840e21d --- /dev/null +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/ValidationMatcher.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser; + +import org.apache.flink.sql.parser.ddl.SqlCreateTable; +import org.apache.flink.sql.parser.error.SqlValidateException; + +import org.apache.calcite.sql.SqlNode; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +/** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} instance. * */ +class ValidationMatcher extends BaseMatcher<SqlNode> { + private String expectedColumnSql; + private String failMsg; + private boolean ok; + + public ValidationMatcher expectColumnSql(String s) { + this.expectedColumnSql = s; + return this; + } + + public ValidationMatcher fails(String failMsg) { + this.failMsg = failMsg; + this.ok = false; + return this; + } + + public ValidationMatcher ok() { + this.failMsg = null; + this.ok = true; + return this; + } + + @Override + public void describeTo(Description description) { + description.appendText("test"); + } + + @Override + public boolean matches(Object item) { + if (item instanceof ExtendedSqlNode) { + ExtendedSqlNode createTable = (ExtendedSqlNode) item; + + if (ok) { + try { + createTable.validate(); + } catch (SqlValidateException e) { + fail("unexpected exception", e); + } + } else if (failMsg != null) { + try { + createTable.validate(); + fail("expected exception"); + } catch (SqlValidateException e) { + assertThat(e).hasMessage(failMsg); + } + } + + if (expectedColumnSql != null && item instanceof SqlCreateTable) { + assertThat(((SqlCreateTable) createTable).getColumnSqlString()) + .isEqualTo(expectedColumnSql); + } + return true; + } else { + return false; + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java index 43ca0badfa0..a1df41d505b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java @@ -26,15 +26,13 @@ import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.UnresolvedIdentifier; -import org.apache.calcite.sql.SqlNode; - import java.util.function.Consumer; import java.util.function.Supplier; import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; /** Abstract converter for {@link org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable}. */ -public abstract class AbstractAlterMaterializedTableConverter<T extends SqlNode> +public abstract class AbstractAlterMaterializedTableConverter<T extends SqlAlterMaterializedTable> implements SqlNodeConverter<T> { protected ObjectIdentifier resolveIdentifier(
