This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 192e1e8fb04c3a8c88673fde1b66dd359b5b0fe0
Author: fengli <ldliu...@163.com>
AuthorDate: Mon May 6 20:21:09 2024 +0800

    [FLINK-35195][table] Introduce MaterializedTableChange to support update 
materialized table refresh status and RefreshHandler
---
 .../apache/flink/table/catalog/CatalogManager.java |   3 +-
 .../operations/ddl/AlterTableChangeOperation.java  |   6 +-
 .../AlterMaterializedTableChangeOperation.java     | 107 ++++++++++++++++++
 .../AlterMaterializedTableOperation.java           |  42 ++++++++
 .../apache/flink/table/catalog/TableChange.java    | 120 +++++++++++++++++++++
 5 files changed, 275 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 9e7bf5ec007..51b69c650eb 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -1151,7 +1151,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                 (catalog, path) -> {
                     final CatalogBaseTable resolvedTable = 
resolveCatalogBaseTable(table);
                     catalog.alterTable(path, resolvedTable, ignoreIfNotExists);
-                    if (resolvedTable instanceof CatalogTable) {
+                    if (resolvedTable instanceof CatalogTable
+                            || resolvedTable instanceof 
CatalogMaterializedTable) {
                         catalogModificationListeners.forEach(
                                 listener ->
                                         listener.onEvent(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
index 158bdd22121..7a597415235 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -58,7 +58,9 @@ public class AlterTableChangeOperation extends 
AlterTableOperation {
     @Override
     public String asSummaryString() {
         String changes =
-                
tableChanges.stream().map(this::toString).collect(Collectors.joining(",\n"));
+                tableChanges.stream()
+                        .map(AlterTableChangeOperation::toString)
+                        .collect(Collectors.joining(",\n"));
         return String.format(
                 "ALTER TABLE %s%s\n%s",
                 ignoreIfTableNotExists ? "IF EXISTS " : "",
@@ -66,7 +68,7 @@ public class AlterTableChangeOperation extends 
AlterTableOperation {
                 changes);
     }
 
-    private String toString(TableChange tableChange) {
+    public static String toString(TableChange tableChange) {
         if (tableChange instanceof TableChange.SetOption) {
             TableChange.SetOption setChange = (TableChange.SetOption) 
tableChange;
             return String.format("  SET '%s' = '%s'", setChange.getKey(), 
setChange.getValue());
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
new file mode 100644
index 00000000000..49f220a8ddc
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Alter materialized table with new table definition and table changes 
represents the modification.
+ */
+@Internal
+public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTableOperation {
+
+    private final List<TableChange> tableChanges;
+    private final CatalogMaterializedTable catalogMaterializedTable;
+
+    public AlterMaterializedTableChangeOperation(
+            ObjectIdentifier tableIdentifier,
+            List<TableChange> tableChanges,
+            CatalogMaterializedTable catalogMaterializedTable) {
+        super(tableIdentifier);
+        this.tableChanges = tableChanges;
+        this.catalogMaterializedTable = catalogMaterializedTable;
+    }
+
+    public List<TableChange> getTableChanges() {
+        return tableChanges;
+    }
+
+    public CatalogMaterializedTable getCatalogMaterializedTable() {
+        return catalogMaterializedTable;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ctx.getCatalogManager()
+                .alterTable(
+                        getCatalogMaterializedTable(),
+                        getTableChanges().stream()
+                                .map(TableChange.class::cast)
+                                .collect(Collectors.toList()),
+                        getTableIdentifier(),
+                        false);
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    @Override
+    public String asSummaryString() {
+        String changes =
+                tableChanges.stream()
+                        .map(
+                                tableChange -> {
+                                    if (tableChange
+                                            instanceof 
TableChange.MaterializedTableChange) {
+                                        return toString(
+                                                
(TableChange.MaterializedTableChange) tableChange);
+                                    } else {
+                                        return 
AlterTableChangeOperation.toString(tableChange);
+                                    }
+                                })
+                        .collect(Collectors.joining(",\n"));
+        return String.format(
+                "ALTER MATERIALIZED TABLE %s\n%s", 
tableIdentifier.asSummaryString(), changes);
+    }
+
+    private String toString(TableChange.MaterializedTableChange tableChange) {
+        if (tableChange instanceof TableChange.ModifyRefreshStatus) {
+            TableChange.ModifyRefreshStatus refreshStatus =
+                    (TableChange.ModifyRefreshStatus) tableChange;
+            return String.format(
+                    "  MODIFY REFRESH STATUS TO '%s'", 
refreshStatus.getRefreshStatus());
+        } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
+            TableChange.ModifyRefreshHandler refreshHandler =
+                    (TableChange.ModifyRefreshHandler) tableChange;
+            return String.format(
+                    "  MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
+                    refreshHandler.getRefreshHandlerDesc());
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unknown materialized table change: %s.", 
tableChange));
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableOperation.java
new file mode 100644
index 00000000000..da367b598ba
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableOperation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.ddl.AlterOperation;
+
+/**
+ * Abstract Operation to describe all ALTER MATERIALIZED TABLE statements such 
as rename table /set
+ * properties.
+ */
+@Internal
+public abstract class AlterMaterializedTableOperation
+        implements AlterOperation, MaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    public AlterMaterializedTableOperation(ObjectIdentifier tableIdentifier) {
+        this.tableIdentifier = tableIdentifier;
+    }
+
+    public ObjectIdentifier getTableIdentifier() {
+        return tableIdentifier;
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
index 34eba1d6845..cf7408c61fb 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.Objects;
 
 /** {@link TableChange} represents the modification of the table. */
@@ -309,6 +310,29 @@ public interface TableChange {
         return new ResetOption(key);
     }
 
+    /**
+     * A table change to modify materialized table refresh status.
+     *
+     * @param refreshStatus the modified refresh status.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyRefreshStatus modifyRefreshStatus(
+            CatalogMaterializedTable.RefreshStatus refreshStatus) {
+        return new ModifyRefreshStatus(refreshStatus);
+    }
+
+    /**
+     * A table change to modify materialized table refresh handler.
+     *
+     * @param refreshHandlerDesc the modified refresh handler description.
+     * @param refreshHandlerBytes the modified refresh handler bytes.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyRefreshHandler modifyRefreshHandler(
+            String refreshHandlerDesc, byte[] refreshHandlerBytes) {
+        return new ModifyRefreshHandler(refreshHandlerDesc, 
refreshHandlerBytes);
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Add Change
     // 
--------------------------------------------------------------------------------------------
@@ -1096,4 +1120,100 @@ public interface TableChange {
             return String.format("AFTER %s", 
EncodingUtils.escapeIdentifier(column));
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Materialized table change
+    // 
--------------------------------------------------------------------------------------------
+    /** {@link MaterializedTableChange} represents the modification of the 
materialized table. */
+    @PublicEvolving
+    interface MaterializedTableChange extends TableChange {}
+
+    /** A table change to modify materialized table refresh status. */
+    @PublicEvolving
+    class ModifyRefreshStatus implements MaterializedTableChange {
+
+        private final CatalogMaterializedTable.RefreshStatus refreshStatus;
+
+        public ModifyRefreshStatus(CatalogMaterializedTable.RefreshStatus 
refreshStatus) {
+            this.refreshStatus = refreshStatus;
+        }
+
+        public CatalogMaterializedTable.RefreshStatus getRefreshStatus() {
+            return refreshStatus;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ModifyRefreshStatus that = (ModifyRefreshStatus) o;
+            return refreshStatus == that.refreshStatus;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(refreshStatus);
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyRefreshStatus{" + "refreshStatus=" + refreshStatus + 
'}';
+        }
+    }
+
+    /** A table change to modify materialized table refresh handler. */
+    @PublicEvolving
+    class ModifyRefreshHandler implements MaterializedTableChange {
+
+        private final String refreshHandlerDesc;
+        private final byte[] refreshHandlerBytes;
+
+        public ModifyRefreshHandler(String refreshHandlerDesc, byte[] 
refreshHandlerBytes) {
+            this.refreshHandlerDesc = refreshHandlerDesc;
+            this.refreshHandlerBytes = refreshHandlerBytes;
+        }
+
+        public String getRefreshHandlerDesc() {
+            return refreshHandlerDesc;
+        }
+
+        public byte[] getRefreshHandlerBytes() {
+            return refreshHandlerBytes;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ModifyRefreshHandler that = (ModifyRefreshHandler) o;
+            return Objects.equals(refreshHandlerDesc, that.refreshHandlerDesc)
+                    && Arrays.equals(refreshHandlerBytes, 
that.refreshHandlerBytes);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(refreshHandlerDesc);
+            result = 31 * result + Arrays.hashCode(refreshHandlerBytes);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyRefreshHandler{"
+                    + "refreshHandlerDesc='"
+                    + refreshHandlerDesc
+                    + '\''
+                    + ", refreshHandlerBytes="
+                    + Arrays.toString(refreshHandlerBytes)
+                    + '}';
+        }
+    }
 }

Reply via email to