This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3e41e723b9d [FLINK-38365][table] Make invalid operations with `MATERIALIZED TABLE`s returning more user friendly message 3e41e723b9d is described below commit 3e41e723b9dcb776040e4861ae35220a72dbd798 Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Tue Sep 16 23:21:58 2025 +0200 [FLINK-38365][table] Make invalid operations with `MATERIALIZED TABLE`s returning more user friendly message --- .../table/operations/TruncateTableOperation.java | 5 --- .../table/operations/utils/ValidationUtils.java | 46 ++++++++++++++++++++++ .../operations/SqlNodeToOperationConversion.java | 13 +++--- .../operations/converters/SqlNodeConvertUtils.java | 8 ++-- .../converters/SqlTruncateTableConverter.java | 9 +++++ .../operations/SqlDdlToOperationConverterTest.java | 21 ++++++++++ 6 files changed, 86 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java index 186f704b511..cce547652b9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.internal.TableResultInternal; -import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -54,10 +53,6 @@ public class TruncateTableOperation implements ExecutableOperation { CatalogManager catalogManager = ctx.getCatalogManager(); ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(tableIdentifier); - CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable(); - if (catalogBaseTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) { - throw new TableException("TRUNCATE TABLE statement is not supported for view."); - } ResolvedCatalogTable resolvedTable = contextResolvedTable.getResolvedTable(); ObjectIdentifier objectIdentifier = contextResolvedTable.getIdentifier(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/ValidationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/ValidationUtils.java new file mode 100644 index 00000000000..6c7f52f1434 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/ValidationUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; + +import java.util.Locale; + +/** Utility class for validation of operations. */ +@Internal +public final class ValidationUtils { + + private ValidationUtils() {} + + public static void validateTableKind( + CatalogBaseTable baseTable, CatalogBaseTable.TableKind expected, String operationName) { + final CatalogBaseTable.TableKind kind = baseTable.getTableKind(); + if (kind == expected) { + return; + } + + throw new ValidationException( + String.format( + "%s for a %s is not allowed", + operationName.toUpperCase(Locale.ROOT), + kind.name().toLowerCase(Locale.ROOT).replace('_', ' '))); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 48a21897ddd..a4563a2671c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -79,6 +79,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogFunction; @@ -88,7 +89,6 @@ import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.FunctionLanguage; @@ -161,6 +161,7 @@ import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; import org.apache.flink.table.operations.utils.LikeType; import org.apache.flink.table.operations.utils.ShowLikeOperator; +import org.apache.flink.table.operations.utils.ValidationUtils; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.operations.converters.SqlNodeConverters; @@ -419,9 +420,8 @@ public class SqlNodeToOperationConversion { "Table %s doesn't exist or is a temporary table.", tableIdentifier)); } CatalogBaseTable baseTable = optionalCatalogTable.get().getResolvedTable(); - if (baseTable instanceof CatalogView) { - throw new ValidationException("ALTER TABLE for a view is not allowed"); - } + ValidationUtils.validateTableKind(baseTable, TableKind.TABLE, "alter table"); + ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) baseTable; if (sqlAlterTable instanceof SqlAlterTableRename) { UnresolvedIdentifier newUnresolvedIdentifier = @@ -1059,9 +1059,8 @@ public class SqlNodeToOperationConversion { "Table %s doesn't exist or is a temporary table.", tableIdentifier)); } CatalogBaseTable baseTable = optionalCatalogTable.get().getResolvedTable(); - if (baseTable instanceof CatalogView) { - throw new ValidationException("ANALYZE TABLE for a view is not allowed."); - } + ValidationUtils.validateTableKind(baseTable, TableKind.TABLE, "analyze table"); + CatalogTable table = (CatalogTable) baseTable; ResolvedSchema schema = baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver()); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java index dbdf700109a..8d00ca5207e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java @@ -23,13 +23,14 @@ import org.apache.flink.sql.parser.error.SqlValidateException; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogView; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.utils.ValidationUtils; import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext; @@ -138,9 +139,8 @@ class SqlNodeConvertUtils { } // check the view is exactly a view CatalogBaseTable baseTable = optionalCatalogTable.get().getResolvedTable(); - if (baseTable instanceof CatalogTable) { - throw new ValidationException("ALTER VIEW for a table is not allowed"); - } + ValidationUtils.validateTableKind(baseTable, TableKind.VIEW, "alter view"); + return (CatalogView) baseTable; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java index 2c096957529..b6cf4b3e0fe 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java @@ -19,11 +19,15 @@ package org.apache.flink.table.planner.operations.converters; import org.apache.flink.sql.parser.dml.SqlTruncateTable; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.TruncateTableOperation; +import org.apache.flink.table.operations.utils.ValidationUtils; /** A converter for {@link SqlTruncateTable}. */ public class SqlTruncateTableConverter implements SqlNodeConverter<SqlTruncateTable> { @@ -34,6 +38,11 @@ public class SqlTruncateTableConverter implements SqlNodeConverter<SqlTruncateTa UnresolvedIdentifier.of(sqlTruncateTable.fullTableName()); CatalogManager catalogManager = context.getCatalogManager(); ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(tableIdentifier); + CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable(); + ValidationUtils.validateTableKind(catalogBaseTable, TableKind.TABLE, "truncate table"); + return new TruncateTableOperation(tableIdentifier); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 242d1231106..cdf6fbe4174 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -2625,6 +2625,27 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas "A column with the same name `id` has been defined at line 5, column 8.")); } + @Test + void testAlterInvalidOperationsForMaterializedTables() throws Exception { + prepareMaterializedTable("my_materialized_table", false, 1, null, "SELECT 1"); + + assertThatThrownBy(() -> parse("alter table my_materialized_table RENAME to new_name")) + .isInstanceOf(ValidationException.class) + .hasMessage("ALTER TABLE for a materialized table is not allowed"); + + assertThatThrownBy(() -> parse("analyze table my_materialized_table compute statistics")) + .isInstanceOf(ValidationException.class) + .hasMessage("ANALYZE TABLE for a materialized table is not allowed"); + + assertThatThrownBy(() -> parse("alter view my_materialized_table RENAME to new_name")) + .isInstanceOf(ValidationException.class) + .hasMessage("ALTER VIEW for a materialized table is not allowed"); + + assertThatThrownBy(() -> parse("truncate table my_materialized_table")) + .isInstanceOf(ValidationException.class) + .hasMessage("TRUNCATE TABLE for a materialized table is not allowed"); + } + // ~ Tool Methods ---------------------------------------------------------- private static TestItem createTestItem(Object... args) {