AHeise commented on code in PR #27729:
URL: https://github.com/apache/flink/pull/27729#discussion_r2887963644
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -136,48 +131,34 @@ private List<TableChange> buildTableChanges(
mergeContext.getMergedOriginalQuery(),
mergeContext.getMergedExpandedQuery()));
+ changes.addAll(
+ calculateOptionsChange(
+ oldTable.getOptions(),
mergeContext.getMergedTableOptions()));
+
return changes;
}
- private CatalogMaterializedTable
buildNewCatalogMaterializedTableFromOldTable(
- final ResolvedCatalogMaterializedTable oldTable,
- final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
- final MergeContext mergeContext) {
- final Schema.Builder schemaBuilder =
-
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+ public static List<TableChange> calculateOptionsChange(
+ Map<String, String> oldOptions, Map<String, String> newOptions) {
+ if (oldOptions.equals(newOptions)) {
+ return List.of();
+ }
- // Add new columns if this is an alter operation
- final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
- final List<Column> newColumns =
- MaterializedTableUtils.validateAndExtractNewColumns(
- oldSchema, mergeContext.getMergedQuerySchema());
- newColumns.forEach(col -> schemaBuilder.column(col.getName(),
col.getDataType()));
-
- final String comment = sqlCreateOrAlterTable.getComment();
- final IntervalFreshness freshness =
getDerivedFreshness(sqlCreateOrAlterTable);
- final LogicalRefreshMode logicalRefreshMode =
- getDerivedLogicalRefreshMode(sqlCreateOrAlterTable);
- final RefreshMode refreshMode =
getDerivedRefreshMode(logicalRefreshMode);
-
- CatalogMaterializedTable.Builder builder =
- CatalogMaterializedTable.newBuilder()
- .schema(schemaBuilder.build())
- .comment(comment)
-
.distribution(mergeContext.getMergedTableDistribution().orElse(null))
- .partitionKeys(mergeContext.getMergedPartitionKeys())
- .options(mergeContext.getMergedTableOptions())
- .originalQuery(mergeContext.getMergedOriginalQuery())
- .expandedQuery(mergeContext.getMergedExpandedQuery())
- .freshness(freshness)
- .logicalRefreshMode(logicalRefreshMode)
- .refreshMode(refreshMode)
- .refreshStatus(RefreshStatus.INITIALIZING);
-
- // Preserve refresh handler from old materialized table
-
oldTable.getRefreshHandlerDescription().ifPresent(builder::refreshHandlerDescription);
-
builder.serializedRefreshHandler(oldTable.getSerializedRefreshHandler());
-
- return builder.build();
+ final List<TableChange> changes = new ArrayList<>();
+ for (Map.Entry<String, String> option : newOptions.entrySet()) {
+ if (oldOptions.get(option.getKey()) == null
Review Comment:
Extract oldValue.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java:
##########
@@ -136,48 +131,34 @@ private List<TableChange> buildTableChanges(
mergeContext.getMergedOriginalQuery(),
mergeContext.getMergedExpandedQuery()));
+ changes.addAll(
+ calculateOptionsChange(
+ oldTable.getOptions(),
mergeContext.getMergedTableOptions()));
+
return changes;
}
- private CatalogMaterializedTable
buildNewCatalogMaterializedTableFromOldTable(
- final ResolvedCatalogMaterializedTable oldTable,
- final SqlCreateOrAlterMaterializedTable sqlCreateOrAlterTable,
- final MergeContext mergeContext) {
- final Schema.Builder schemaBuilder =
-
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+ public static List<TableChange> calculateOptionsChange(
+ Map<String, String> oldOptions, Map<String, String> newOptions) {
+ if (oldOptions.equals(newOptions)) {
+ return List.of();
+ }
- // Add new columns if this is an alter operation
- final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
- final List<Column> newColumns =
- MaterializedTableUtils.validateAndExtractNewColumns(
- oldSchema, mergeContext.getMergedQuerySchema());
- newColumns.forEach(col -> schemaBuilder.column(col.getName(),
col.getDataType()));
-
- final String comment = sqlCreateOrAlterTable.getComment();
- final IntervalFreshness freshness =
getDerivedFreshness(sqlCreateOrAlterTable);
- final LogicalRefreshMode logicalRefreshMode =
- getDerivedLogicalRefreshMode(sqlCreateOrAlterTable);
- final RefreshMode refreshMode =
getDerivedRefreshMode(logicalRefreshMode);
-
- CatalogMaterializedTable.Builder builder =
- CatalogMaterializedTable.newBuilder()
- .schema(schemaBuilder.build())
- .comment(comment)
-
.distribution(mergeContext.getMergedTableDistribution().orElse(null))
- .partitionKeys(mergeContext.getMergedPartitionKeys())
- .options(mergeContext.getMergedTableOptions())
- .originalQuery(mergeContext.getMergedOriginalQuery())
- .expandedQuery(mergeContext.getMergedExpandedQuery())
- .freshness(freshness)
- .logicalRefreshMode(logicalRefreshMode)
- .refreshMode(refreshMode)
- .refreshStatus(RefreshStatus.INITIALIZING);
-
- // Preserve refresh handler from old materialized table
-
oldTable.getRefreshHandlerDescription().ifPresent(builder::refreshHandlerDescription);
-
builder.serializedRefreshHandler(oldTable.getSerializedRefreshHandler());
-
- return builder.build();
+ final List<TableChange> changes = new ArrayList<>();
+ for (Map.Entry<String, String> option : newOptions.entrySet()) {
+ if (oldOptions.get(option.getKey()) == null
+ ||
!oldOptions.get(option.getKey()).equals(option.getValue())) {
+ changes.add(TableChange.set(option.getKey(),
option.getValue()));
+ }
+ }
+
+ for (Map.Entry<String, String> option : oldOptions.entrySet()) {
+ if (newOptions.get(option.getKey()) == null) {
+ changes.add(TableChange.reset(option.getKey()));
Review Comment:
This will generate a reset for all inferred options. Let's say you have
```
CREATE OR ALTER MATERIALIZED TABLE mt
WITH (
'connector' = 'filesystem'
)
```
and the catalog enriches into SHOW CREATE OR ALTER MATERIALIZED TABLE
```
CREATE OR ALTER MATERIALIZED TABLE mt
WITH (
'connector' = 'filesystem'
'fs' = 'hadoop'
)
```
and now the user adjusts the original settings
```
CREATE OR ALTER MATERIALIZED TABLE mt
WITH (
'connector' = 'filesystem'
'delimiter' = ';'
)
```
We would receive `TableChange.set('delimiter', ';')` and would get
`TableChange.reset('fs')`. I can see that we get many resets this way.
My main question is, is this the best way. And I think it is but I still
wanted to call it out explicitly. My main concern is things like
`changelog.mode`.
Here are my thoughts:
* The catalog is the one that enriches the options, so it can deal with the
resets.
* Catalog can infer the same options again.
* If I would change the connector, catalog may infer completely different
options. Old inferred options may actually be not applicable anymore.
* If the catalog later infers more options than a script that does CREATE OR
ALTER may need to be adjusted if we don't remove those inferred options.
So those options that are implicitly added are also implicitly removed which
is kinda symmetric.
An alternative would be that we never generate resets but wait for the user
to explicitly reset them through other DDLs. But it would then break on the
occasions mentioned above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]