This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e28e495cdd3e0e7cbb58685cb09e1fa08af7223e
Author: fengli <ldliu...@163.com>
AuthorDate: Mon May 6 20:17:57 2024 +0800

    [FLINK-35195][table] Convert SqlCreateMaterializedTable node to 
CreateMaterializedTableOperation
---
 .../flink/sql/parser/SqlConstraintValidator.java   |   2 +-
 .../sql/parser/ddl/SqlCreateMaterializedTable.java |   1 -
 .../CreateMaterializedTableOperation.java          |  76 ++++++
 .../MaterializedTableOperation.java                |  26 +++
 .../planner/operations/SqlNodeConvertContext.java  |   8 +
 .../SqlCreateMaterializedTableConverter.java       | 210 +++++++++++++++++
 .../operations/converters/SqlNodeConverter.java    |   5 +
 .../operations/converters/SqlNodeConverters.java   |   1 +
 .../planner/utils/MaterializedTableUtils.java      |  98 ++++++++
 ...erializedTableNodeToOperationConverterTest.java | 259 +++++++++++++++++++++
 .../SqlNodeToOperationConversionTestBase.java      |   2 +-
 .../SqlRTASNodeToOperationConverterTest.java       |   2 +-
 12 files changed, 686 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
index 8a9a7727b54..f157e5034a8 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlConstraintValidator.java
@@ -89,7 +89,7 @@ public class SqlConstraintValidator {
     }
 
     /** Check table constraint. */
-    private static void validate(SqlTableConstraint constraint) throws 
SqlValidateException {
+    public static void validate(SqlTableConstraint constraint) throws 
SqlValidateException {
         if (constraint.isUnique()) {
             throw new SqlValidateException(
                     constraint.getParserPosition(), "UNIQUE constraint is not 
supported yet");
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 1630a0f0117..eae6f1fcba9 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -132,7 +132,6 @@ public class SqlCreateMaterializedTable extends SqlCreate {
         return freshness;
     }
 
-    @Nullable
     public Optional<SqlLiteral> getRefreshMode() {
         return Optional.ofNullable(refreshMode);
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
new file mode 100644
index 00000000000..d4eff00254d
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.ddl.CreateOperation;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Operation to describe a CREATE MATERIALIZED TABLE statement. */
+@Internal
+public class CreateMaterializedTableOperation
+        implements CreateOperation, MaterializedTableOperation {
+
+    private final ObjectIdentifier tableIdentifier;
+    private final CatalogMaterializedTable materializedTable;
+
+    public CreateMaterializedTableOperation(
+            ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable 
materializedTable) {
+        this.tableIdentifier = tableIdentifier;
+        this.materializedTable = materializedTable;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        // create materialized table in catalog
+        ctx.getCatalogManager().createTable(materializedTable, 
tableIdentifier, false);
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    public ObjectIdentifier getTableIdentifier() {
+        return tableIdentifier;
+    }
+
+    public CatalogMaterializedTable getCatalogMaterializedTable() {
+        return materializedTable;
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("materializedTable", materializedTable);
+        params.put("identifier", tableIdentifier);
+
+        return OperationUtils.formatWithChildren(
+                "CREATE MATERIALIZED TABLE",
+                params,
+                Collections.emptyList(),
+                Operation::asSummaryString);
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java
new file mode 100644
index 00000000000..72b83ad1939
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableOperation.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.Operation;
+
+/** The marker interface for materialized table. */
+@Internal
+public interface MaterializedTableOperation extends Operation {}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
index 50d894bd825..2958333caba 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.SqlToRexConverter;
@@ -43,6 +44,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
 /** An implementation of {@link SqlNodeConverter.ConvertContext}. */
 public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext {
 
@@ -54,6 +57,11 @@ public class SqlNodeConvertContext implements 
SqlNodeConverter.ConvertContext {
         this.catalogManager = catalogManager;
     }
 
+    @Override
+    public TableConfig getTableConfig() {
+        return unwrapTableConfig(flinkPlanner.cluster());
+    }
+
     @Override
     public SqlValidator getSqlValidator() {
         return flinkPlanner.getOrCreateSqlValidator();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
new file mode 100644
index 00000000000..03fc0cbcf66
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
+import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.utils.MaterializedTableUtils;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD;
+
+/** A converter for {@link SqlCreateMaterializedTable}. */
+public class SqlCreateMaterializedTableConverter
+        implements SqlNodeConverter<SqlCreateMaterializedTable> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlCreateMaterializedTable sqlCreateMaterializedTable, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                
UnresolvedIdentifier.of(sqlCreateMaterializedTable.fullTableName());
+        ObjectIdentifier identifier =
+                
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+
+        // get comment
+        String tableComment =
+                
OperationConverterUtils.getTableComment(sqlCreateMaterializedTable.getComment());
+
+        // get options
+        Map<String, String> options = new HashMap<>();
+        sqlCreateMaterializedTable
+                .getPropertyList()
+                .getList()
+                .forEach(
+                        p ->
+                                options.put(
+                                        ((SqlTableOption) p).getKeyString(),
+                                        ((SqlTableOption) 
p).getValueString()));
+
+        // get freshness
+        Duration freshness =
+                MaterializedTableUtils.getMaterializedTableFreshness(
+                        sqlCreateMaterializedTable.getFreshness());
+
+        // get refresh mode
+        SqlRefreshMode sqlRefreshMode = null;
+        if (sqlCreateMaterializedTable.getRefreshMode().isPresent()) {
+            sqlRefreshMode =
+                    sqlCreateMaterializedTable
+                            .getRefreshMode()
+                            .get()
+                            .getValueAs(SqlRefreshMode.class);
+        }
+        CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode =
+                
MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode);
+        // only MATERIALIZED_TABLE_FRESHNESS_THRESHOLD configured in flink 
conf yaml work, so we get
+        // it from rootConfiguration instead of table config
+        CatalogMaterializedTable.RefreshMode refreshMode =
+                MaterializedTableUtils.deriveRefreshMode(
+                        context.getTableConfig()
+                                .getRootConfiguration()
+                                .get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD),
+                        freshness,
+                        logicalRefreshMode);
+
+        // get query schema and definition query
+        SqlNode validateQuery =
+                
context.getSqlValidator().validate(sqlCreateMaterializedTable.getAsQuery());
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validateQuery).project(),
+                        () -> context.toQuotedSqlString(validateQuery));
+        String definitionQuery =
+                
context.expandSqlIdentifiers(queryOperation.asSerializableString());
+
+        // get schema
+        ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();
+        Schema.Builder builder = 
Schema.newBuilder().fromResolvedSchema(resolvedSchema);
+
+        // get and verify partition key
+        List<String> partitionKeys =
+                
sqlCreateMaterializedTable.getPartitionKeyList().getList().stream()
+                        .map(p -> ((SqlIdentifier) p).getSimple())
+                        .collect(Collectors.toList());
+        verifyPartitioningColumnsExist(resolvedSchema, partitionKeys);
+
+        // verify and build primary key
+        sqlCreateMaterializedTable
+                .getTableConstraint()
+                .ifPresent(
+                        sqlTableConstraint ->
+                                verifyAndBuildPrimaryKey(
+                                        builder, resolvedSchema, 
sqlTableConstraint));
+
+        CatalogMaterializedTable materializedTable =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(builder.build())
+                        .comment(tableComment)
+                        .partitionKeys(partitionKeys)
+                        .options(options)
+                        .definitionQuery(definitionQuery)
+                        .freshness(freshness)
+                        .logicalRefreshMode(logicalRefreshMode)
+                        .refreshMode(refreshMode)
+                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .build();
+
+        return new CreateMaterializedTableOperation(
+                identifier,
+                
context.getCatalogManager().resolveCatalogMaterializedTable(materializedTable));
+    }
+
+    private static void verifyPartitioningColumnsExist(
+            ResolvedSchema resolvedSchema, List<String> partitionKeys) {
+        for (String partitionKey : partitionKeys) {
+            if (!resolvedSchema.getColumn(partitionKey).isPresent()) {
+                throw new ValidationException(
+                        String.format(
+                                "Partition column '%s' not defined in the 
query schema. Available columns: [%s].",
+                                partitionKey,
+                                resolvedSchema.getColumnNames().stream()
+                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            }
+        }
+    }
+
+    private static void verifyAndBuildPrimaryKey(
+            Schema.Builder schemaBuilder,
+            ResolvedSchema resolvedSchema,
+            SqlTableConstraint sqlTableConstraint) {
+        // check constraint type
+        try {
+            SqlConstraintValidator.validate(sqlTableConstraint);
+        } catch (SqlValidateException e) {
+            throw new ValidationException(
+                    String.format("Primary key validation failed: %s.", 
e.getMessage()), e);
+        }
+
+        List<String> primaryKeyColumns = 
Arrays.asList(sqlTableConstraint.getColumnNames());
+        for (String columnName : primaryKeyColumns) {
+            Optional<Column> columnOptional = 
resolvedSchema.getColumn(columnName);
+            if (!columnOptional.isPresent()) {
+                throw new ValidationException(
+                        String.format(
+                                "Primary key column '%s' not defined in the 
query schema. Available columns: [%s].",
+                                columnName,
+                                resolvedSchema.getColumnNames().stream()
+                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            }
+
+            if 
(columnOptional.get().getDataType().getLogicalType().isNullable()) {
+                throw new ValidationException(
+                        String.format(
+                                "Could not create a PRIMARY KEY with nullable 
column '%s'.\n"
+                                        + "A PRIMARY KEY column must be 
declared on non-nullable physical columns.",
+                                columnName));
+            }
+        }
+
+        // build primary key
+        String constraintName =
+                sqlTableConstraint
+                        .getConstraintName()
+                        .orElseGet(
+                                () ->
+                                        primaryKeyColumns.stream()
+                                                
.collect(Collectors.joining("_", "PK_", "")));
+        schemaBuilder.primaryKeyNamed(constraintName, primaryKeyColumns);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
index a2b48359810..cdd9d860097 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.operations.converters;
 
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.planner.utils.Expander;
@@ -72,6 +74,9 @@ public interface SqlNodeConverter<S extends SqlNode> {
     /** Context of {@link SqlNodeConverter}. */
     interface ConvertContext {
 
+        /** Returns the {@link TableConfig} defined in {@link 
TableEnvironment}. */
+        TableConfig getTableConfig();
+
         /** Returns the {@link SqlValidator} in the convert context. */
         SqlValidator getSqlValidator();
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index caaafc9a331..fc5e3bd498c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -56,6 +56,7 @@ public class SqlNodeConverters {
         register(new SqlShowCreateCatalogConverter());
         register(new SqlDescribeCatalogConverter());
         register(new SqlDescribeJobConverter());
+        register(new SqlCreateMaterializedTableConverter());
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
new file mode 100644
index 00000000000..33794eac26f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+
+import java.time.Duration;
+
+/** The utils for materialized table. */
+@Internal
+public class MaterializedTableUtils {
+
+    public static Duration getMaterializedTableFreshness(SqlIntervalLiteral 
sqlIntervalLiteral) {
+        if (sqlIntervalLiteral.signum() < 0) {
+            throw new ValidationException(
+                    "Materialized table freshness doesn't support negative 
value.");
+        }
+        if (sqlIntervalLiteral.getTypeName().getFamily() != 
SqlTypeFamily.INTERVAL_DAY_TIME) {
+            throw new ValidationException(
+                    "Materialized table freshness only support SECOND, MINUTE, 
HOUR, DAY as the time unit.");
+        }
+
+        SqlIntervalLiteral.IntervalValue intervalValue =
+                
sqlIntervalLiteral.getValueAs(SqlIntervalLiteral.IntervalValue.class);
+        long interval = Long.parseLong(intervalValue.getIntervalLiteral());
+        switch (intervalValue.getIntervalQualifier().typeName()) {
+            case INTERVAL_DAY:
+                return Duration.ofDays(interval);
+            case INTERVAL_HOUR:
+                return Duration.ofHours(interval);
+            case INTERVAL_MINUTE:
+                return Duration.ofMinutes(interval);
+            case INTERVAL_SECOND:
+                return Duration.ofSeconds(interval);
+            default:
+                throw new ValidationException(
+                        "Materialized table freshness only support SECOND, 
MINUTE, HOUR, DAY as the time unit.");
+        }
+    }
+
+    public static CatalogMaterializedTable.LogicalRefreshMode 
deriveLogicalRefreshMode(
+            SqlRefreshMode sqlRefreshMode) {
+        if (sqlRefreshMode == null) {
+            return CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC;
+        }
+
+        switch (sqlRefreshMode) {
+            case FULL:
+                return CatalogMaterializedTable.LogicalRefreshMode.FULL;
+            case CONTINUOUS:
+                return CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS;
+            default:
+                throw new ValidationException(
+                        String.format("Unsupported logical refresh mode: %s.", 
sqlRefreshMode));
+        }
+    }
+
+    public static CatalogMaterializedTable.RefreshMode deriveRefreshMode(
+            Duration threshold,
+            Duration definedFreshness,
+            CatalogMaterializedTable.LogicalRefreshMode definedRefreshMode) {
+        // If the refresh mode is specified manually, use it directly.
+        if (definedRefreshMode == 
CatalogMaterializedTable.LogicalRefreshMode.FULL) {
+            return CatalogMaterializedTable.RefreshMode.FULL;
+        } else if (definedRefreshMode == 
CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS) {
+            return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
+        }
+
+        // derive the actual refresh mode via defined freshness
+        if (definedFreshness.compareTo(threshold) <= 0) {
+            return CatalogMaterializedTable.RefreshMode.CONTINUOUS;
+        } else {
+            return CatalogMaterializedTable.RefreshMode.FULL;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
new file mode 100644
index 00000000000..9514e12367e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for the materialized table statements for {@link 
SqlNodeToOperationConversion}. */
+public class SqlMaterializedTableNodeToOperationConverterTest
+        extends SqlNodeToOperationConversionTestBase {
+
+    @Test
+    public void testCreateMaterializedTable() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = 
(CreateMaterializedTableOperation) operation;
+        CatalogMaterializedTable materializedTable = 
op.getCatalogMaterializedTable();
+        
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "filesystem");
+        options.put("format", "json");
+        CatalogMaterializedTable expected =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("a", 
DataTypes.BIGINT().notNull())
+                                        .column("b", 
DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .column("c", DataTypes.INT())
+                                        .column("d", 
DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .primaryKeyNamed("ct1", 
Collections.singletonList("a"))
+                                        .build())
+                        .comment("materialized table comment")
+                        .options(options)
+                        .partitionKeys(Arrays.asList("a", "d"))
+                        .freshness(Duration.ofSeconds(30))
+                        
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
+                        .refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
+                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .definitionQuery(
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`")
+                        .build();
+
+        assertThat(((ResolvedCatalogMaterializedTable) 
materializedTable).getOrigin())
+                .isEqualTo(expected);
+    }
+
+    @Test
+    public void testContinuousRefreshMode() {
+        // test continuous mode derived by specify freshness automatically
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = 
(CreateMaterializedTableOperation) operation;
+        CatalogMaterializedTable materializedTable = 
op.getCatalogMaterializedTable();
+        
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        assertThat(materializedTable.getLogicalRefreshMode())
+                
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+        assertThat(materializedTable.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+
+        // test continuous mode by manual specify
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' DAY\n"
+                        + "REFRESH_MODE = CONTINUOUS\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation2 = parse(sql2);
+        
assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op2 = 
(CreateMaterializedTableOperation) operation2;
+        CatalogMaterializedTable materializedTable2 = 
op2.getCatalogMaterializedTable();
+        
assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        assertThat(materializedTable2.getLogicalRefreshMode())
+                
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS);
+        assertThat(materializedTable2.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+    }
+
+    @Test
+    public void testFullRefreshMode() {
+        // test full mode derived by specify freshness automatically
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '1' DAY\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = 
(CreateMaterializedTableOperation) operation;
+        CatalogMaterializedTable materializedTable = 
op.getCatalogMaterializedTable();
+        
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        assertThat(materializedTable.getLogicalRefreshMode())
+                
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+        assertThat(materializedTable.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+
+        // test full mode by manual specify
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation2 = parse(sql2);
+        
assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op2 = 
(CreateMaterializedTableOperation) operation2;
+        CatalogMaterializedTable materializedTable2 = 
op2.getCatalogMaterializedTable();
+        
assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        assertThat(materializedTable2.getLogicalRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);
+        assertThat(materializedTable2.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+    }
+
+    @Test
+    public void testCreateMaterializedTableWithInvalidPrimaryKey() {
+        // test unsupported constraint
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 UNIQUE(a) NOT ENFORCED"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "AS SELECT * FROM t1";
+
+        assertThatThrownBy(() -> parse(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Primary key validation failed: UNIQUE constraint is 
not supported yet.");
+
+        // test primary key not defined in source table
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(e) NOT ENFORCED"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "AS SELECT * FROM t1";
+
+        assertThatThrownBy(() -> parse(sql2))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Primary key column 'e' not defined in the query 
schema. Available columns: ['a', 'b', 'c', 'd'].");
+
+        // test primary key with nullable source column
+        final String sql3 =
+                "CREATE MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(d) NOT ENFORCED"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "AS SELECT * FROM t1";
+
+        assertThatThrownBy(() -> parse(sql3))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Could not create a PRIMARY KEY with 
nullable column 'd'.");
+    }
+
+    @Test
+    public void testCreateMaterializedTableWithInvalidPartitionKey() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "PARTITIONED BY (a, e)\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Partition column 'e' not defined in the query schema. 
Available columns: ['a', 'b', 'c', 'd'].");
+    }
+
+    @Test
+    public void testCreateMaterializedTableWithInvalidFreshnessType() {
+        // test negative freshness value
+        final String sql =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL -'30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Materialized table freshness doesn't support negative 
value.");
+
+        // test unsupported freshness type
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' YEAR\n"
+                        + "AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql2))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Materialized table freshness only support SECOND, 
MINUTE, HOUR, DAY as the time unit.");
+
+        // test unsupported freshness type
+        final String sql3 =
+                "CREATE MATERIALIZED TABLE mtbl1\n"
+                        + "FRESHNESS = INTERVAL '30' DAY TO HOUR\n"
+                        + "AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql3))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Materialized table freshness only support SECOND, 
MINUTE, HOUR, DAY as the time unit.");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
index 68efa48899e..2b87c89ebf8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
@@ -107,7 +107,7 @@ public class SqlNodeToOperationConversionTestBase {
                 Schema.newBuilder()
                         .fromResolvedSchema(
                                 ResolvedSchema.of(
-                                        Column.physical("a", 
DataTypes.BIGINT()),
+                                        Column.physical("a", 
DataTypes.BIGINT().notNull()),
                                         Column.physical("b", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
                                         Column.physical("c", DataTypes.INT()),
                                         Column.physical("d", 
DataTypes.VARCHAR(Integer.MAX_VALUE))))
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index 5c31f230cd4..8a6fc806cab 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -126,7 +126,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
                 .fromFields(
                         new String[] {"a", "b", "c", "d"},
                         new AbstractDataType[] {
-                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT().notNull(),
                             DataTypes.STRING(),
                             DataTypes.INT(),
                             DataTypes.STRING()


Reply via email to