twalthr commented on code in PR #27302:
URL: https://github.com/apache/flink/pull/27302#discussion_r2585433541
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAddDistributionConverter.java:
##########
@@ -30,38 +30,40 @@
import org.apache.flink.table.planner.utils.OperationConverterUtils;
import java.util.List;
+import java.util.Optional;
/** A converter for {@link SqlAlterMaterializedTableAddDistribution}. */
public class SqlAlterMaterializedTableAddDistributionConverter
extends
AbstractAlterMaterializedTableConverter<SqlAlterMaterializedTableAddDistribution>
{
@Override
- public Operation convertSqlNode(
- SqlAlterMaterializedTableAddDistribution node, ConvertContext
context) {
- ObjectIdentifier identifier = resolveIdentifier(node, context);
-
- ResolvedCatalogMaterializedTable oldTable =
- getResolvedMaterializedTable(
- context,
- identifier,
- () -> "Operation is supported only for materialized
tables");
+ protected Operation convertToOperation(
+ SqlAlterMaterializedTableAddDistribution sqlAddDistribution,
+ ResolvedCatalogMaterializedTable oldTable,
+ ConvertContext context) {
+ TableDistribution distribution =
getTableDistribution(sqlAddDistribution, oldTable);
+ // Build new materialized table and apply changes
+ CatalogMaterializedTable updatedTable =
+ buildUpdatedMaterializedTable(
+ oldTable, builder ->
builder.distribution(distribution));
+ return new AlterMaterializedTableChangeOperation(
+ resolveIdentifier(sqlAddDistribution, context),
+ List.of(TableChange.add(distribution)),
+ updatedTable);
+ }
- if (oldTable.getDistribution().isPresent()) {
+ private TableDistribution getTableDistribution(
+ SqlAlterMaterializedTableDistribution sqlAlterTableDistribution,
+ ResolvedCatalogMaterializedTable oldTable) {
+ Optional<TableDistribution> oldDistribution =
oldTable.getDistribution();
+ if (oldDistribution.isPresent()) {
throw new ValidationException(
String.format(
- "Materialized table %s has already defined the
distribution `%s`. "
+ "%sThe base materialized table has already defined
the distribution `%s`. "
Review Comment:
```suggestion
"%sThe current materialized table has already
defined the distribution `%s`. "
```
##########
tools/maven/checkstyle.xml:
##########
@@ -99,6 +99,22 @@ This file is based on the checkstyle file of Apache Beam.
<property name="message" value="Use
System.getProperties() to get system properties."/>
</module>
+ <module name="Regexp">
Review Comment:
Isn't that a bit too strict? I wouldn't spam the global Flink checkstyle
with that. Static imports should still be optional, even though very useful.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java:
##########
@@ -202,7 +203,7 @@ private void verifyPartitioningColumnsExist(
if (!partitionKeys.contains(partitionKey)) {
throw new ValidationException(
String.format(
- "Column '%s' referenced by materialized table
option '%s' isn't a partition column. Available partition columns: [%s].",
+ "Column `%s` referenced by materialized table
option '%s' isn't a partition column. Available partition columns: [%s].",
Review Comment:
```suggestion
"Column '%s' referenced by materialized
table option '%s' isn't a partition column. Available partition columns: [%s].",
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java:
##########
@@ -184,7 +185,7 @@ private void verifyPartitioningColumnsExist(
if (schema.getColumn(partitionKey).isEmpty()) {
throw new ValidationException(
String.format(
- "Partition column '%s' not defined in the
query schema. Available columns: [%s].",
+ "Partition column `%s` not defined in the
query's schema. Available columns: [%s].",
Review Comment:
nit: use single quotes for consistency
```suggestion
"Partition column '%s' not defined in the
query's schema. Available columns: [%s].",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]