twalthr commented on code in PR #27302:
URL: https://github.com/apache/flink/pull/27302#discussion_r2585433541


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAddDistributionConverter.java:
##########
@@ -30,38 +30,40 @@
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
 import java.util.List;
+import java.util.Optional;
 
 /** A converter for {@link SqlAlterMaterializedTableAddDistribution}. */
 public class SqlAlterMaterializedTableAddDistributionConverter
         extends 
AbstractAlterMaterializedTableConverter<SqlAlterMaterializedTableAddDistribution>
 {
     @Override
-    public Operation convertSqlNode(
-            SqlAlterMaterializedTableAddDistribution node, ConvertContext 
context) {
-        ObjectIdentifier identifier = resolveIdentifier(node, context);
-
-        ResolvedCatalogMaterializedTable oldTable =
-                getResolvedMaterializedTable(
-                        context,
-                        identifier,
-                        () -> "Operation is supported only for materialized 
tables");
+    protected Operation convertToOperation(
+            SqlAlterMaterializedTableAddDistribution sqlAddDistribution,
+            ResolvedCatalogMaterializedTable oldTable,
+            ConvertContext context) {
+        TableDistribution distribution = 
getTableDistribution(sqlAddDistribution, oldTable);
+        // Build new materialized table and apply changes
+        CatalogMaterializedTable updatedTable =
+                buildUpdatedMaterializedTable(
+                        oldTable, builder -> 
builder.distribution(distribution));
+        return new AlterMaterializedTableChangeOperation(
+                resolveIdentifier(sqlAddDistribution, context),
+                List.of(TableChange.add(distribution)),
+                updatedTable);
+    }
 
-        if (oldTable.getDistribution().isPresent()) {
+    private TableDistribution getTableDistribution(
+            SqlAlterMaterializedTableDistribution sqlAlterTableDistribution,
+            ResolvedCatalogMaterializedTable oldTable) {
+        Optional<TableDistribution> oldDistribution = 
oldTable.getDistribution();
+        if (oldDistribution.isPresent()) {
             throw new ValidationException(
                     String.format(
-                            "Materialized table %s has already defined the 
distribution `%s`. "
+                            "%sThe base materialized table has already defined 
the distribution `%s`. "

Review Comment:
   ```suggestion
                               "%sThe current materialized table has already 
defined the distribution `%s`. "
   ```



##########
tools/maven/checkstyle.xml:
##########
@@ -99,6 +99,22 @@ This file is based on the checkstyle file of Apache Beam.
                        <property name="message" value="Use 
System.getProperties() to get system properties."/>
                </module>
 
+               <module name="Regexp">

Review Comment:
   Isn't that a bit too strict? I wouldn't spam the global Flink checkstyle 
with that. Static imports should still be optional, even though very useful.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java:
##########
@@ -202,7 +203,7 @@ private void verifyPartitioningColumnsExist(
             if (!partitionKeys.contains(partitionKey)) {
                 throw new ValidationException(
                         String.format(
-                                "Column '%s' referenced by materialized table 
option '%s' isn't a partition column. Available partition columns: [%s].",
+                                "Column `%s` referenced by materialized table 
option '%s' isn't a partition column. Available partition columns: [%s].",

Review Comment:
   ```suggestion
                                   "Column '%s' referenced by materialized 
table option '%s' isn't a partition column. Available partition columns: [%s].",
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java:
##########
@@ -184,7 +185,7 @@ private void verifyPartitioningColumnsExist(
             if (schema.getColumn(partitionKey).isEmpty()) {
                 throw new ValidationException(
                         String.format(
-                                "Partition column '%s' not defined in the 
query schema. Available columns: [%s].",
+                                "Partition column `%s` not defined in the 
query's schema. Available columns: [%s].",

Review Comment:
   nit: use single quotes for consistency
   ```suggestion
                                   "Partition column '%s' not defined in the 
query's schema. Available columns: [%s].",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to