This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e41e723b9d [FLINK-38365][table] Make invalid operations with 
`MATERIALIZED TABLE`s returning more user friendly message
3e41e723b9d is described below

commit 3e41e723b9dcb776040e4861ae35220a72dbd798
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Tue Sep 16 23:21:58 2025 +0200

    [FLINK-38365][table] Make invalid operations with `MATERIALIZED TABLE`s 
returning more user friendly message
---
 .../table/operations/TruncateTableOperation.java   |  5 ---
 .../table/operations/utils/ValidationUtils.java    | 46 ++++++++++++++++++++++
 .../operations/SqlNodeToOperationConversion.java   | 13 +++---
 .../operations/converters/SqlNodeConvertUtils.java |  8 ++--
 .../converters/SqlTruncateTableConverter.java      |  9 +++++
 .../operations/SqlDdlToOperationConverterTest.java | 21 ++++++++++
 6 files changed, 86 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java
index 186f704b511..cce547652b9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -54,10 +53,6 @@ public class TruncateTableOperation implements 
ExecutableOperation {
 
         CatalogManager catalogManager = ctx.getCatalogManager();
         ContextResolvedTable contextResolvedTable = 
catalogManager.getTableOrError(tableIdentifier);
-        CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
-        if (catalogBaseTable.getTableKind() == 
CatalogBaseTable.TableKind.VIEW) {
-            throw new TableException("TRUNCATE TABLE statement is not 
supported for view.");
-        }
 
         ResolvedCatalogTable resolvedTable = 
contextResolvedTable.getResolvedTable();
         ObjectIdentifier objectIdentifier = 
contextResolvedTable.getIdentifier();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/ValidationUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/ValidationUtils.java
new file mode 100644
index 00000000000..6c7f52f1434
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/ValidationUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+
+import java.util.Locale;
+
+/** Utility class for validation of operations. */
+@Internal
+public final class ValidationUtils {
+
+    private ValidationUtils() {}
+
+    public static void validateTableKind(
+            CatalogBaseTable baseTable, CatalogBaseTable.TableKind expected, 
String operationName) {
+        final CatalogBaseTable.TableKind kind = baseTable.getTableKind();
+        if (kind == expected) {
+            return;
+        }
+
+        throw new ValidationException(
+                String.format(
+                        "%s for a %s is not allowed",
+                        operationName.toUpperCase(Locale.ROOT),
+                        kind.name().toLowerCase(Locale.ROOT).replace('_', ' 
')));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index 48a21897ddd..a4563a2671c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -79,6 +79,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
@@ -88,7 +89,6 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.FunctionLanguage;
@@ -161,6 +161,7 @@ import 
org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.operations.utils.LikeType;
 import org.apache.flink.table.operations.utils.ShowLikeOperator;
+import org.apache.flink.table.operations.utils.ValidationUtils;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.operations.converters.SqlNodeConverters;
@@ -419,9 +420,8 @@ public class SqlNodeToOperationConversion {
                             "Table %s doesn't exist or is a temporary table.", 
tableIdentifier));
         }
         CatalogBaseTable baseTable = 
optionalCatalogTable.get().getResolvedTable();
-        if (baseTable instanceof CatalogView) {
-            throw new ValidationException("ALTER TABLE for a view is not 
allowed");
-        }
+        ValidationUtils.validateTableKind(baseTable, TableKind.TABLE, "alter 
table");
+
         ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) 
baseTable;
         if (sqlAlterTable instanceof SqlAlterTableRename) {
             UnresolvedIdentifier newUnresolvedIdentifier =
@@ -1059,9 +1059,8 @@ public class SqlNodeToOperationConversion {
                             "Table %s doesn't exist or is a temporary table.", 
tableIdentifier));
         }
         CatalogBaseTable baseTable = 
optionalCatalogTable.get().getResolvedTable();
-        if (baseTable instanceof CatalogView) {
-            throw new ValidationException("ANALYZE TABLE for a view is not 
allowed.");
-        }
+        ValidationUtils.validateTableKind(baseTable, TableKind.TABLE, "analyze 
table");
+
         CatalogTable table = (CatalogTable) baseTable;
         ResolvedSchema schema =
                 
baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
index dbdf700109a..8d00ca5207e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConvertUtils.java
@@ -23,13 +23,14 @@ import 
org.apache.flink.sql.parser.error.SqlValidateException;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogView;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.utils.ValidationUtils;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import 
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
 
@@ -138,9 +139,8 @@ class SqlNodeConvertUtils {
         }
         // check the view is exactly a view
         CatalogBaseTable baseTable = 
optionalCatalogTable.get().getResolvedTable();
-        if (baseTable instanceof CatalogTable) {
-            throw new ValidationException("ALTER VIEW for a table is not 
allowed");
-        }
+        ValidationUtils.validateTableKind(baseTable, TableKind.VIEW, "alter 
view");
+
         return (CatalogView) baseTable;
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java
index 2c096957529..b6cf4b3e0fe 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlTruncateTableConverter.java
@@ -19,11 +19,15 @@
 package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.dml.SqlTruncateTable;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.TruncateTableOperation;
+import org.apache.flink.table.operations.utils.ValidationUtils;
 
 /** A converter for {@link SqlTruncateTable}. */
 public class SqlTruncateTableConverter implements 
SqlNodeConverter<SqlTruncateTable> {
@@ -34,6 +38,11 @@ public class SqlTruncateTableConverter implements 
SqlNodeConverter<SqlTruncateTa
                 UnresolvedIdentifier.of(sqlTruncateTable.fullTableName());
         CatalogManager catalogManager = context.getCatalogManager();
         ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        ContextResolvedTable contextResolvedTable = 
catalogManager.getTableOrError(tableIdentifier);
+        CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
+        ValidationUtils.validateTableKind(catalogBaseTable, TableKind.TABLE, 
"truncate table");
+
         return new TruncateTableOperation(tableIdentifier);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 242d1231106..cdf6fbe4174 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -2625,6 +2625,27 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                 "A column with the same name `id` has been 
defined at line 5, column 8."));
     }
 
+    @Test
+    void testAlterInvalidOperationsForMaterializedTables() throws Exception {
+        prepareMaterializedTable("my_materialized_table", false, 1, null, 
"SELECT 1");
+
+        assertThatThrownBy(() -> parse("alter table my_materialized_table 
RENAME to new_name"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("ALTER TABLE for a materialized table is not 
allowed");
+
+        assertThatThrownBy(() -> parse("analyze table my_materialized_table 
compute statistics"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("ANALYZE TABLE for a materialized table is not 
allowed");
+
+        assertThatThrownBy(() -> parse("alter view my_materialized_table 
RENAME to new_name"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("ALTER VIEW for a materialized table is not 
allowed");
+
+        assertThatThrownBy(() -> parse("truncate table my_materialized_table"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("TRUNCATE TABLE for a materialized table is not 
allowed");
+    }
+
     // ~ Tool Methods 
----------------------------------------------------------
 
     private static TestItem createTestItem(Object... args) {

Reply via email to