AHeise commented on code in PR #27594: URL: https://github.com/apache/flink/pull/27594#discussion_r2873045866
########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeUtils.java: ########## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableDistribution; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Utils for materialized table change calculation. */ +@Internal +public class MaterializedTableChangeUtils { + + public static List<TableChange> buildTableChanges( + ResolvedCatalogMaterializedTable oldTable, ResolvedCatalogMaterializedTable newTable) { + List<TableChange> tableChanges = new ArrayList<>(calculateSchemaChange(oldTable, newTable)); + + // Distribution + tableChanges.addAll(calculateDistributionChange(oldTable, newTable)); + + // Options + tableChanges.addAll(calculateOptionsChange(oldTable, newTable)); + + // Query + tableChanges.addAll(calculateQueryChange(oldTable, newTable)); + return tableChanges; + } + + public static List<TableChange> buildSchemaTableChanges( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + final List<Column> oldColumns = oldSchema.getColumns(); + // Schema retrieved from query doesn't count existing non persisted columns + final List<Column> newColumns = + enrichWithOldNonPersistedColumns(oldSchema, newSchema).getColumns(); + + final List<Column> oldPersistedColumns = new ArrayList<>(); + final Map<String, Tuple2<Column, Integer>> oldColumnSet = new HashMap<>(); + for (int i = 0; i < oldColumns.size(); i++) { + Column column = oldColumns.get(i); + if (column.isPersisted()) { + oldPersistedColumns.add(oldColumns.get(i)); + } + oldColumnSet.put(column.getName(), Tuple2.of(oldColumns.get(i), i)); + } + + if (oldPersistedColumns.equals(newSchema.getColumns())) { + // No changes for persisted columns + return new ArrayList<>(); + } + + List<TableChange> changes = new ArrayList<>(); + for (int i = 0; i < newColumns.size(); i++) { + Column newColumn = newColumns.get(i); + Tuple2<Column, Integer> oldColumnToPosition = oldColumnSet.get(newColumn.getName()); + + if (oldColumnToPosition == null) { + changes.add(TableChange.add(newColumn.copy(newColumn.getDataType()))); + continue; + } + + // Check if position changed + applyPositionChanges(newColumns, oldColumnToPosition, i, changes); + + Column oldColumn = oldColumnToPosition.f0; + // Check if column changed + // Note: it could be unchanged while the position is changed + if (oldColumn.equals(newColumn)) { + // no changes + continue; + } + + // Check if kind changed + if (oldColumn.getClass() != newColumn.getClass()) { + changes.add(TableChange.dropColumn(oldColumn.getName())); + changes.add(TableChange.add(newColumn.copy(newColumn.getDataType()))); + continue; + } + + // Check if comment is changed + if (!Objects.equals( + oldColumn.getComment().orElse(null), newColumn.getComment().orElse(null))) { + changes.add( + TableChange.modifyColumnComment( + oldColumn, newColumn.getComment().orElse(null))); + } + + // Check if physical column type changed + if (oldColumn.isPhysical() + && newColumn.isPhysical() + && !oldColumn.getDataType().equals(newColumn.getDataType())) { + changes.add( + TableChange.modifyPhysicalColumnType(oldColumn, newColumn.getDataType())); + } + + // Check if metadata fields changed + if (oldColumn instanceof Column.MetadataColumn) { + applyMetadataColumnChanges( + (Column.MetadataColumn) oldColumn, + (Column.MetadataColumn) newColumn, + changes); + } + + // Check if computed expression changed + if (oldColumn instanceof Column.ComputedColumn) { + applyComputedColumnChanges( + (Column.ComputedColumn) oldColumn, + (Column.ComputedColumn) newColumn, + changes); + } + } + + for (Column newColumn : newColumns) { + oldColumnSet.remove(newColumn.getName()); + } + + for (Map.Entry<String, Tuple2<Column, Integer>> entry : oldColumnSet.entrySet()) { + changes.add(TableChange.dropColumn(entry.getKey())); + } + + return changes; + } + + private static ResolvedSchema enrichWithOldNonPersistedColumns( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + final List<Integer> nonPersistedColumnIndexes = getNonPersistedColumnIndexes(oldSchema); + if (nonPersistedColumnIndexes.isEmpty()) { + return newSchema; + } + + final Column[] newColumns = + new Column[newSchema.getColumnCount() + nonPersistedColumnIndexes.size()]; + int nextFreePosition = 0; + + // Preserve initial positions of non persisted columns + // It will allow to not generate extra column position changes + for (int index : nonPersistedColumnIndexes) { + newColumns[index] = oldSchema.getColumn(index).get(); + if (index == nextFreePosition) { + nextFreePosition++; + } + } + + for (int i = 0; i < newSchema.getColumnCount(); i++) { + newColumns[nextFreePosition] = newSchema.getColumn(i).get(); + while (nextFreePosition < newColumns.length && newColumns[nextFreePosition] != null) { + nextFreePosition++; + } + } + + return ResolvedSchema.of(newColumns); + } + + private static List<Integer> getNonPersistedColumnIndexes(ResolvedSchema schema) { + final List<Integer> nonPersistedColumnIndexes = new ArrayList<>(); + for (int i = 0; i < schema.getColumnCount(); i++) { + if (!schema.getColumn(i).get().isPersisted()) { + nonPersistedColumnIndexes.add(i); + } + } + return nonPersistedColumnIndexes; + } + + private static void applyComputedColumnChanges( + Column.ComputedColumn oldColumn, + Column.ComputedColumn newColumn, + List<TableChange> changes) { + if (!oldColumn + .getExpression() + .asSerializableString() + .equals(newColumn.getExpression().asSerializableString()) + && !Objects.equals( + oldColumn.explainExtras().orElse(null), + newColumn.explainExtras().orElse(null))) { + // for now there is no dedicated table change + changes.add(TableChange.dropColumn(oldColumn.getName())); + changes.add(TableChange.add(newColumn)); + } + } + + private static void applyMetadataColumnChanges( + Column.MetadataColumn oldColumn, + Column.MetadataColumn newColumn, + List<TableChange> changes) { + if (oldColumn.isVirtual() != newColumn.isVirtual() + || !Objects.equals( + oldColumn.getMetadataKey().orElse(null), + newColumn.getMetadataKey().orElse(null))) { + // for now there is no dedicated table change + changes.add(TableChange.dropColumn(oldColumn.getName())); + changes.add(TableChange.add(newColumn)); + } + } + + private static void applyPositionChanges( + List<Column> newColumns, + Tuple2<Column, Integer> oldColumnToPosition, + int currentPosition, + List<TableChange> changes) { + Column oldColumn = oldColumnToPosition.f0; + int oldPosition = oldColumnToPosition.f1; + if (oldPosition != currentPosition) { + TableChange.ColumnPosition position = + currentPosition == 0 + ? TableChange.ColumnPosition.first() + : TableChange.ColumnPosition.after( + newColumns.get(currentPosition - 1).getName()); + changes.add(TableChange.modifyColumnPosition(oldColumn, position)); + } + } + + private static List<TableChange> calculateSchemaChange( + ResolvedCatalogMaterializedTable oldTable, ResolvedCatalogMaterializedTable newTable) { + ResolvedSchema oldSchema = oldTable.getResolvedSchema(); + ResolvedSchema newSchema = newTable.getResolvedSchema(); + return MaterializedTableChangeUtils.buildSchemaTableChanges(oldSchema, newSchema); + } + + private static List<TableChange> calculateDistributionChange( + ResolvedCatalogMaterializedTable oldTable, ResolvedCatalogMaterializedTable newTable) { + Optional<TableDistribution> oldTableDistribution = oldTable.getDistribution(); + Optional<TableDistribution> newTableDistribution = newTable.getDistribution(); + if (oldTableDistribution.isEmpty() && newTableDistribution.isEmpty()) { + return List.of(); + } + + if (oldTableDistribution.isPresent() + && newTableDistribution.isPresent() + && oldTableDistribution.get().equals(newTableDistribution.get())) { + return List.of(); + } + + if (oldTableDistribution.isPresent() && newTableDistribution.isEmpty()) { + return List.of(TableChange.dropDistribution()); + } + + if (oldTableDistribution.isEmpty()) { + return List.of(TableChange.add(newTableDistribution.get())); + } + + return List.of(TableChange.modify(newTableDistribution.get())); + } + + private static List<TableChange> calculateQueryChange( + ResolvedCatalogMaterializedTable oldTable, ResolvedCatalogMaterializedTable newTable) { + String oldQuery = oldTable.getOriginalQuery(); + String newQuery = newTable.getOriginalQuery(); + if (!oldQuery.equals(newQuery)) { + return List.of( + TableChange.modifyDefinitionQuery(newQuery, newTable.getExpandedQuery())); + } + return List.of(); + } + + private static List<TableChange> calculateOptionsChange( + ResolvedCatalogMaterializedTable oldTable, ResolvedCatalogMaterializedTable newTable) { + Map<String, String> oldOptions = new HashMap<>(oldTable.getOptions()); + Map<String, String> newOptions = new HashMap<>(newTable.getOptions()); + if (oldOptions.equals(newOptions)) { + return List.of(); + } + + List<TableChange> changes = new ArrayList<>(); + for (Map.Entry<String, String> option : oldOptions.entrySet()) { + if (newOptions.containsKey(option.getKey())) { + if (Objects.equals(newOptions.get(option.getKey()), option.getValue())) { Review Comment: Shouldn't this be negated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
