raminqaf commented on code in PR #27302:
URL: https://github.com/apache/flink/pull/27302#discussion_r2584034527


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java:
##########
@@ -35,6 +40,39 @@
 public abstract class AbstractAlterMaterializedTableConverter<T extends 
SqlAlterMaterializedTable>
         implements SqlNodeConverter<T> {
 
+    protected static final String EX_MSG_PREFIX =
+            "Failed to execute ALTER MATERIALIZED TABLE statement.\n";
+
+    protected abstract Operation convertToOperation(
+            T sqlAlterTable,
+            ResolvedCatalogMaterializedTable oldMaterializedTable,
+            ConvertContext context);
+
+    @Override
+    public final Operation convertSqlNode(T sqlAlterMaterializedTable, 
ConvertContext context) {
+        CatalogManager catalogManager = context.getCatalogManager();
+        final ObjectIdentifier materializedTableIdentifier =
+                resolveIdentifier(sqlAlterMaterializedTable, context);
+        Optional<ContextResolvedTable> optionalCatalogMaterializedTable =
+                catalogManager.getTable(materializedTableIdentifier);
+
+        if (optionalCatalogMaterializedTable.isEmpty()
+                || optionalCatalogMaterializedTable.get().isTemporary()) {
+            throw new ValidationException(
+                    String.format(
+                            "Materialized table %s doesn't exist.", 
materializedTableIdentifier));
+        }
+        ValidationUtils.validateTableKind(
+                optionalCatalogMaterializedTable.get().getTable(),
+                MATERIALIZED_TABLE,

Review Comment:
   nit
   ```suggestion
                   TableKind.MATERIALIZED_TABLE,
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1220,6 +1220,49 @@ void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E
                 ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
+    @Test
+    void testAlterMaterializedTableAddSchemaColumnsFullMode() throws Exception 
{
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops ADD (`ct` AS 
CURRENT_TIMESTAMP, WATERMARK FOR `ct` AS `ct` - INTERVAL '3' SECOND)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))

Review Comment:
   Should we also add an assertion that the watermark was added?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java:
##########
@@ -132,4 +142,61 @@ public static List<Column> validateAndExtractNewColumns(
 
         return newAddedColumns;
     }
+
+    public static void validatePhysicalColumnsUsedByQuery(
+            ResolvedCatalogMaterializedTable oldTable,
+            SqlAlterMaterializedTableSchema alterTableSchema,
+            ConvertContext context) {
+        final SqlNodeList sqlNodeList = alterTableSchema.getColumnPositions();
+        if (sqlNodeList.isEmpty()) {
+            return;
+        }
+
+        final SqlNode originalQuery =
+                
context.getFlinkPlanner().parser().parse(oldTable.getOriginalQuery());
+        final SqlNode validateQuery = 
context.getSqlValidator().validate(originalQuery);
+        final PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validateQuery).project(),
+                        () -> context.toQuotedSqlString(validateQuery));
+
+        validatePhysicalColumnsUsedByQuery(sqlNodeList, 
queryOperation.getResolvedSchema());
+    }
+
+    public static void validatePhysicalColumnsUsedByQuery(
+            SqlNodeList columnPositions, ResolvedSchema querySchema) {
+        final Set<String> querySchemaColumnNames = new 
HashSet<>(querySchema.getColumnNames());
+        for (SqlNode column : columnPositions) {
+            throwIfPhysicalColumnNotUsedByQuery(column, 
querySchemaColumnNames);
+        }
+    }
+
+    public static List<SqlTableColumn> getSqlTableColumns(SqlNodeList 
columnPositions) {
+        List<SqlTableColumn> list = new ArrayList<>();

Review Comment:
   nit
   ```suggestion
           List<SqlTableColumn> columns = new ArrayList<>();
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1220,6 +1220,49 @@ void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E
                 ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
+    @Test
+    void testAlterMaterializedTableAddSchemaColumnsFullMode() throws Exception 
{
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);

Review Comment:
   nit: 
   ```suggestion
                   "users_shops", List.of(), Map.of(), RefreshMode.FULL);
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java:
##########
@@ -1022,4 +1039,56 @@ private static List<Arguments> createOrAlter(final 
String operation) {
                                         "PK_user_id", List.of("user_id")),
                                 List.of())));
     }
+
+    private CatalogMaterializedTable.Builder 
getDefaultMaterializedTablebuilder() {

Review Comment:
   maybe a one liner comment to define the "defaultMaterializedTable" meaning, 
could help to understand this method and tests better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to