This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 192e1e8fb04c3a8c88673fde1b66dd359b5b0fe0 Author: fengli <ldliu...@163.com> AuthorDate: Mon May 6 20:21:09 2024 +0800 [FLINK-35195][table] Introduce MaterializedTableChange to support update materialized table refresh status and RefreshHandler --- .../apache/flink/table/catalog/CatalogManager.java | 3 +- .../operations/ddl/AlterTableChangeOperation.java | 6 +- .../AlterMaterializedTableChangeOperation.java | 107 ++++++++++++++++++ .../AlterMaterializedTableOperation.java | 42 ++++++++ .../apache/flink/table/catalog/TableChange.java | 120 +++++++++++++++++++++ 5 files changed, 275 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 9e7bf5ec007..51b69c650eb 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -1151,7 +1151,8 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { (catalog, path) -> { final CatalogBaseTable resolvedTable = resolveCatalogBaseTable(table); catalog.alterTable(path, resolvedTable, ignoreIfNotExists); - if (resolvedTable instanceof CatalogTable) { + if (resolvedTable instanceof CatalogTable + || resolvedTable instanceof CatalogMaterializedTable) { catalogModificationListeners.forEach( listener -> listener.onEvent( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java index 158bdd22121..7a597415235 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java @@ -58,7 +58,9 @@ public class AlterTableChangeOperation extends AlterTableOperation { @Override public String asSummaryString() { String changes = - tableChanges.stream().map(this::toString).collect(Collectors.joining(",\n")); + tableChanges.stream() + .map(AlterTableChangeOperation::toString) + .collect(Collectors.joining(",\n")); return String.format( "ALTER TABLE %s%s\n%s", ignoreIfTableNotExists ? "IF EXISTS " : "", @@ -66,7 +68,7 @@ public class AlterTableChangeOperation extends AlterTableOperation { changes); } - private String toString(TableChange tableChange) { + public static String toString(TableChange tableChange) { if (tableChange instanceof TableChange.SetOption) { TableChange.SetOption setChange = (TableChange.SetOption) tableChange; return String.format(" SET '%s' = '%s'", setChange.getKey(), setChange.getValue()); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java new file mode 100644 index 00000000000..49f220a8ddc --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Alter materialized table with new table definition and table changes represents the modification. + */ +@Internal +public class AlterMaterializedTableChangeOperation extends AlterMaterializedTableOperation { + + private final List<TableChange> tableChanges; + private final CatalogMaterializedTable catalogMaterializedTable; + + public AlterMaterializedTableChangeOperation( + ObjectIdentifier tableIdentifier, + List<TableChange> tableChanges, + CatalogMaterializedTable catalogMaterializedTable) { + super(tableIdentifier); + this.tableChanges = tableChanges; + this.catalogMaterializedTable = catalogMaterializedTable; + } + + public List<TableChange> getTableChanges() { + return tableChanges; + } + + public CatalogMaterializedTable getCatalogMaterializedTable() { + return catalogMaterializedTable; + } + + @Override + public TableResultInternal execute(Context ctx) { + ctx.getCatalogManager() + .alterTable( + getCatalogMaterializedTable(), + getTableChanges().stream() + .map(TableChange.class::cast) + .collect(Collectors.toList()), + getTableIdentifier(), + false); + return TableResultImpl.TABLE_RESULT_OK; + } + + @Override + public String asSummaryString() { + String changes = + tableChanges.stream() + .map( + tableChange -> { + if (tableChange + instanceof TableChange.MaterializedTableChange) { + return toString( + (TableChange.MaterializedTableChange) tableChange); + } else { + return AlterTableChangeOperation.toString(tableChange); + } + }) + .collect(Collectors.joining(",\n")); + return String.format( + "ALTER MATERIALIZED TABLE %s\n%s", tableIdentifier.asSummaryString(), changes); + } + + private String toString(TableChange.MaterializedTableChange tableChange) { + if (tableChange instanceof TableChange.ModifyRefreshStatus) { + TableChange.ModifyRefreshStatus refreshStatus = + (TableChange.ModifyRefreshStatus) tableChange; + return String.format( + " MODIFY REFRESH STATUS TO '%s'", refreshStatus.getRefreshStatus()); + } else if (tableChange instanceof TableChange.ModifyRefreshHandler) { + TableChange.ModifyRefreshHandler refreshHandler = + (TableChange.ModifyRefreshHandler) tableChange; + return String.format( + " MODIFY REFRESH HANDLER DESCRIPTION TO '%s'", + refreshHandler.getRefreshHandlerDesc()); + } else { + throw new UnsupportedOperationException( + String.format("Unknown materialized table change: %s.", tableChange)); + } + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableOperation.java new file mode 100644 index 00000000000..da367b598ba --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableOperation.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.ddl.AlterOperation; + +/** + * Abstract Operation to describe all ALTER MATERIALIZED TABLE statements such as rename table /set + * properties. + */ +@Internal +public abstract class AlterMaterializedTableOperation + implements AlterOperation, MaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + public AlterMaterializedTableOperation(ObjectIdentifier tableIdentifier) { + this.tableIdentifier = tableIdentifier; + } + + public ObjectIdentifier getTableIdentifier() { + return tableIdentifier; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java index 34eba1d6845..cf7408c61fb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java @@ -25,6 +25,7 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Objects; /** {@link TableChange} represents the modification of the table. */ @@ -309,6 +310,29 @@ public interface TableChange { return new ResetOption(key); } + /** + * A table change to modify materialized table refresh status. + * + * @param refreshStatus the modified refresh status. + * @return a TableChange represents the modification. + */ + static ModifyRefreshStatus modifyRefreshStatus( + CatalogMaterializedTable.RefreshStatus refreshStatus) { + return new ModifyRefreshStatus(refreshStatus); + } + + /** + * A table change to modify materialized table refresh handler. + * + * @param refreshHandlerDesc the modified refresh handler description. + * @param refreshHandlerBytes the modified refresh handler bytes. + * @return a TableChange represents the modification. + */ + static ModifyRefreshHandler modifyRefreshHandler( + String refreshHandlerDesc, byte[] refreshHandlerBytes) { + return new ModifyRefreshHandler(refreshHandlerDesc, refreshHandlerBytes); + } + // -------------------------------------------------------------------------------------------- // Add Change // -------------------------------------------------------------------------------------------- @@ -1096,4 +1120,100 @@ public interface TableChange { return String.format("AFTER %s", EncodingUtils.escapeIdentifier(column)); } } + + // -------------------------------------------------------------------------------------------- + // Materialized table change + // -------------------------------------------------------------------------------------------- + /** {@link MaterializedTableChange} represents the modification of the materialized table. */ + @PublicEvolving + interface MaterializedTableChange extends TableChange {} + + /** A table change to modify materialized table refresh status. */ + @PublicEvolving + class ModifyRefreshStatus implements MaterializedTableChange { + + private final CatalogMaterializedTable.RefreshStatus refreshStatus; + + public ModifyRefreshStatus(CatalogMaterializedTable.RefreshStatus refreshStatus) { + this.refreshStatus = refreshStatus; + } + + public CatalogMaterializedTable.RefreshStatus getRefreshStatus() { + return refreshStatus; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ModifyRefreshStatus that = (ModifyRefreshStatus) o; + return refreshStatus == that.refreshStatus; + } + + @Override + public int hashCode() { + return Objects.hash(refreshStatus); + } + + @Override + public String toString() { + return "ModifyRefreshStatus{" + "refreshStatus=" + refreshStatus + '}'; + } + } + + /** A table change to modify materialized table refresh handler. */ + @PublicEvolving + class ModifyRefreshHandler implements MaterializedTableChange { + + private final String refreshHandlerDesc; + private final byte[] refreshHandlerBytes; + + public ModifyRefreshHandler(String refreshHandlerDesc, byte[] refreshHandlerBytes) { + this.refreshHandlerDesc = refreshHandlerDesc; + this.refreshHandlerBytes = refreshHandlerBytes; + } + + public String getRefreshHandlerDesc() { + return refreshHandlerDesc; + } + + public byte[] getRefreshHandlerBytes() { + return refreshHandlerBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ModifyRefreshHandler that = (ModifyRefreshHandler) o; + return Objects.equals(refreshHandlerDesc, that.refreshHandlerDesc) + && Arrays.equals(refreshHandlerBytes, that.refreshHandlerBytes); + } + + @Override + public int hashCode() { + int result = Objects.hash(refreshHandlerDesc); + result = 31 * result + Arrays.hashCode(refreshHandlerBytes); + return result; + } + + @Override + public String toString() { + return "ModifyRefreshHandler{" + + "refreshHandlerDesc='" + + refreshHandlerDesc + + '\'' + + ", refreshHandlerBytes=" + + Arrays.toString(refreshHandlerBytes) + + '}'; + } + } }