This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4a9a749310f33618b942dd0258855642001e0d3
Author: Jark Wu <[email protected]>
AuthorDate: Sat Mar 11 00:07:42 2023 +0800

    [FLINK-25347][table] Replace deprecated TableSchema with Schema in 
MergeTableLikeUtil
    
    This closes #22158
---
 .../planner/operations/MergeTableLikeUtil.java     | 130 ++++---
 .../operations/SqlCreateTableConverter.java        |  47 +--
 .../planner/operations/MergeTableLikeUtilTest.java | 388 ++++++++++-----------
 .../planner/plan/stream/sql/TableScanTest.scala    |   6 +-
 4 files changed, 278 insertions(+), 293 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
index 29c70592825..6cdab69b427 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
@@ -28,16 +28,17 @@ import 
org.apache.flink.sql.parser.ddl.SqlTableLike.FeatureOption;
 import org.apache.flink.sql.parser.ddl.SqlTableLike.MergingStrategy;
 import org.apache.flink.sql.parser.ddl.SqlWatermark;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableColumn.ComputedColumn;
-import org.apache.flink.table.api.TableColumn.MetadataColumn;
-import org.apache.flink.table.api.TableColumn.PhysicalColumn;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedComputedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPrimaryKey;
+import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.WatermarkSpec;
-import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.TypeConversions;
 
@@ -45,6 +46,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.validate.SqlValidator;
 
@@ -79,10 +81,15 @@ class MergeTableLikeUtil {
 
     private final SqlValidator validator;
     private final Function<SqlNode, String> escapeExpression;
+    private final DataTypeFactory dataTypeFactory;
 
-    MergeTableLikeUtil(SqlValidator validator, Function<SqlNode, String> 
escapeExpression) {
+    MergeTableLikeUtil(
+            SqlValidator validator,
+            Function<SqlNode, String> escapeExpression,
+            DataTypeFactory dataTypeFactory) {
         this.validator = validator;
         this.escapeExpression = escapeExpression;
+        this.dataTypeFactory = dataTypeFactory;
     }
 
     /**
@@ -133,9 +140,9 @@ class MergeTableLikeUtil {
      * of the merged properties. E.g. Some of the columns used in computed 
columns of the derived
      * table can be defined in the source table.
      */
-    public TableSchema mergeTables(
+    public Schema mergeTables(
             Map<FeatureOption, MergingStrategy> mergingStrategies,
-            TableSchema sourceSchema,
+            Schema sourceSchema,
             List<SqlNode> derivedColumns,
             List<SqlWatermark> derivedWatermarkSpecs,
             SqlTableConstraint derivedPrimaryKey) {
@@ -145,6 +152,7 @@ class MergeTableLikeUtil {
                         mergingStrategies,
                         sourceSchema,
                         (FlinkTypeFactory) validator.getTypeFactory(),
+                        dataTypeFactory,
                         validator,
                         escapeExpression);
         schemaBuilder.appendDerivedColumns(mergingStrategies, derivedColumns);
@@ -208,9 +216,9 @@ class MergeTableLikeUtil {
 
     private static class SchemaBuilder {
 
-        Map<String, TableColumn> columns = new LinkedHashMap<>();
-        Map<String, WatermarkSpec> watermarkSpecs = new HashMap<>();
-        UniqueConstraint primaryKey = null;
+        Map<String, UnresolvedColumn> columns = new LinkedHashMap<>();
+        Map<String, UnresolvedWatermarkSpec> watermarkSpecs = new HashMap<>();
+        UnresolvedPrimaryKey primaryKey = null;
 
         // Intermediate state
         Map<String, RelDataType> physicalFieldNamesToTypes = new 
LinkedHashMap<>();
@@ -220,14 +228,17 @@ class MergeTableLikeUtil {
         Function<SqlNode, String> escapeExpressions;
         FlinkTypeFactory typeFactory;
         SqlValidator sqlValidator;
+        DataTypeFactory dataTypeFactory;
 
         SchemaBuilder(
                 Map<FeatureOption, MergingStrategy> mergingStrategies,
-                TableSchema sourceSchema,
+                Schema sourceSchema,
                 FlinkTypeFactory typeFactory,
+                DataTypeFactory dataTypeFactory,
                 SqlValidator sqlValidator,
                 Function<SqlNode, String> escapeExpressions) {
             this.typeFactory = typeFactory;
+            this.dataTypeFactory = dataTypeFactory;
             this.sqlValidator = sqlValidator;
             this.escapeExpressions = escapeExpressions;
             populateColumnsFromSourceTable(mergingStrategies, sourceSchema);
@@ -236,20 +247,24 @@ class MergeTableLikeUtil {
         }
 
         private void populateColumnsFromSourceTable(
-                Map<FeatureOption, MergingStrategy> mergingStrategies, 
TableSchema sourceSchema) {
-            for (TableColumn sourceColumn : sourceSchema.getTableColumns()) {
-                if (sourceColumn instanceof PhysicalColumn) {
+                Map<FeatureOption, MergingStrategy> mergingStrategies, Schema 
sourceSchema) {
+            for (UnresolvedColumn sourceColumn : sourceSchema.getColumns()) {
+                if (sourceColumn instanceof UnresolvedPhysicalColumn) {
+                    LogicalType columnType =
+                            dataTypeFactory
+                                    .createDataType(
+                                            ((UnresolvedPhysicalColumn) 
sourceColumn).getDataType())
+                                    .getLogicalType();
                     physicalFieldNamesToTypes.put(
                             sourceColumn.getName(),
-                            typeFactory.createFieldTypeFromLogicalType(
-                                    sourceColumn.getType().getLogicalType()));
+                            
typeFactory.createFieldTypeFromLogicalType(columnType));
                     columns.put(sourceColumn.getName(), sourceColumn);
-                } else if (sourceColumn instanceof ComputedColumn) {
+                } else if (sourceColumn instanceof UnresolvedComputedColumn) {
                     if (mergingStrategies.get(FeatureOption.GENERATED)
                             != MergingStrategy.EXCLUDING) {
                         columns.put(sourceColumn.getName(), sourceColumn);
                     }
-                } else if (sourceColumn instanceof MetadataColumn) {
+                } else if (sourceColumn instanceof UnresolvedMetadataColumn) {
                     if (mergingStrategies.get(FeatureOption.METADATA)
                             != MergingStrategy.EXCLUDING) {
                         columns.put(sourceColumn.getName(), sourceColumn);
@@ -259,17 +274,16 @@ class MergeTableLikeUtil {
         }
 
         private void populateWatermarksFromSourceTable(
-                Map<FeatureOption, MergingStrategy> mergingStrategies, 
TableSchema sourceSchema) {
-            for (WatermarkSpec sourceWatermarkSpec : 
sourceSchema.getWatermarkSpecs()) {
+                Map<FeatureOption, MergingStrategy> mergingStrategies, Schema 
sourceSchema) {
+            for (UnresolvedWatermarkSpec sourceWatermarkSpec : 
sourceSchema.getWatermarkSpecs()) {
                 if (mergingStrategies.get(FeatureOption.WATERMARKS) != 
MergingStrategy.EXCLUDING) {
-                    watermarkSpecs.put(
-                            sourceWatermarkSpec.getRowtimeAttribute(), 
sourceWatermarkSpec);
+                    watermarkSpecs.put(sourceWatermarkSpec.getColumnName(), 
sourceWatermarkSpec);
                 }
             }
         }
 
         private void populatePrimaryKeyFromSourceTable(
-                Map<FeatureOption, MergingStrategy> mergingStrategies, 
TableSchema sourceSchema) {
+                Map<FeatureOption, MergingStrategy> mergingStrategies, Schema 
sourceSchema) {
             if (sourceSchema.getPrimaryKey().isPresent()
                     && mergingStrategies.get(FeatureOption.CONSTRAINTS)
                             == MergingStrategy.INCLUDING) {
@@ -292,7 +306,7 @@ class MergeTableLikeUtil {
                                         "Primary key column '%s' is not 
defined in the schema at %s",
                                         primaryKey, 
primaryKeyNode.getParserPosition()));
                     }
-                    if (!columns.get(primaryKey).isPhysical()) {
+                    if (!(columns.get(primaryKey) instanceof 
UnresolvedPhysicalColumn)) {
                         throw new ValidationException(
                                 String.format(
                                         "Could not create a PRIMARY KEY with 
column '%s' at %s.\n"
@@ -302,7 +316,7 @@ class MergeTableLikeUtil {
                     primaryKeyColumns.add(primaryKey);
                 }
                 primaryKey =
-                        UniqueConstraint.primaryKey(
+                        new UnresolvedPrimaryKey(
                                 derivedPrimaryKey
                                         .getConstraintName()
                                         .orElseGet(() -> "PK_" + 
primaryKeyColumns.hashCode()),
@@ -329,15 +343,12 @@ class MergeTableLikeUtil {
                 // this will validate and expand function identifiers.
                 SqlNode validated =
                         
sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
-                RelDataType validatedType = 
sqlValidator.getValidatedNodeType(validated);
-                DataType exprDataType = 
fromLogicalToDataType(toLogicalType(validatedType));
 
                 watermarkSpecs.put(
                         rowtimeAttribute,
-                        new WatermarkSpec(
+                        new UnresolvedWatermarkSpec(
                                 rowtimeAttribute,
-                                escapeExpressions.apply(validated),
-                                exprDataType));
+                                new 
SqlCallExpression(escapeExpressions.apply(validated))));
             }
         }
 
@@ -398,13 +409,20 @@ class MergeTableLikeUtil {
 
             for (SqlNode derivedColumn : derivedColumns) {
                 final String name = ((SqlTableColumn) 
derivedColumn).getName().getSimple();
-                final TableColumn column;
+                final String comment =
+                        ((SqlTableColumn) derivedColumn)
+                                .getComment()
+                                .map(c -> ((SqlLiteral) 
c).getValueAs(String.class))
+                                .orElse(null);
+                final UnresolvedColumn column;
                 if (derivedColumn instanceof SqlRegularColumn) {
                     final LogicalType logicalType =
                             
FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name));
                     column =
-                            TableColumn.physical(
-                                    name, 
TypeConversions.fromLogicalToDataType(logicalType));
+                            new UnresolvedPhysicalColumn(
+                                    name,
+                                    
TypeConversions.fromLogicalToDataType(logicalType),
+                                    comment);
                 } else if (derivedColumn instanceof SqlComputedColumn) {
                     final SqlComputedColumn computedColumn = 
(SqlComputedColumn) derivedColumn;
                     if (physicalFieldNamesToTypes.containsKey(name)) {
@@ -415,7 +433,7 @@ class MergeTableLikeUtil {
                                         name));
                     }
                     if (columns.containsKey(name)) {
-                        if (!(columns.get(name) instanceof ComputedColumn)) {
+                        if (!(columns.get(name) instanceof 
UnresolvedComputedColumn)) {
                             throw new ValidationException(
                                     String.format(
                                             "A column named '%s' already 
exists in the base table. "
@@ -443,10 +461,10 @@ class MergeTableLikeUtil {
                     final RelDataType validatedType =
                             sqlValidator.getValidatedNodeType(validatedExpr);
                     column =
-                            TableColumn.computed(
+                            new UnresolvedComputedColumn(
                                     name,
-                                    
fromLogicalToDataType(toLogicalType(validatedType)),
-                                    escapeExpressions.apply(validatedExpr));
+                                    new 
SqlCallExpression(escapeExpressions.apply(validatedExpr)),
+                                    comment);
                     computedFieldNamesToTypes.put(name, validatedType);
                 } else if (derivedColumn instanceof SqlMetadataColumn) {
                     final SqlMetadataColumn metadataColumn = 
(SqlMetadataColumn) derivedColumn;
@@ -458,7 +476,7 @@ class MergeTableLikeUtil {
                                         name));
                     }
                     if (columns.containsKey(name)) {
-                        if (!(columns.get(name) instanceof MetadataColumn)) {
+                        if (!(columns.get(name) instanceof 
UnresolvedMetadataColumn)) {
                             throw new ValidationException(
                                     String.format(
                                             "A column named '%s' already 
exists in the base table. "
@@ -477,14 +495,15 @@ class MergeTableLikeUtil {
                     }
 
                     SqlDataTypeSpec type = metadataColumn.getType();
-                    boolean nullable = type.getNullable() == null ? true : 
type.getNullable();
+                    boolean nullable = type.getNullable() == null || 
type.getNullable();
                     RelDataType relType = type.deriveType(sqlValidator, 
nullable);
                     column =
-                            TableColumn.metadata(
+                            new UnresolvedMetadataColumn(
                                     name,
                                     
fromLogicalToDataType(toLogicalType(relType)),
                                     
metadataColumn.getMetadataAlias().orElse(null),
-                                    metadataColumn.isVirtual());
+                                    metadataColumn.isVirtual(),
+                                    comment);
                     metadataFieldNamesToTypes.put(name, relType);
                 } else {
                     throw new ValidationException("Unsupported column type: " 
+ derivedColumn);
@@ -505,7 +524,7 @@ class MergeTableLikeUtil {
                                         name));
                     }
                     SqlDataTypeSpec type = regularColumn.getType();
-                    boolean nullable = type.getNullable() == null ? true : 
type.getNullable();
+                    boolean nullable = type.getNullable() == null || 
type.getNullable();
                     RelDataType relType = type.deriveType(sqlValidator, 
nullable);
                     // add field name and field type to physical field list
                     RelDataType oldType = physicalFieldNamesToTypes.put(name, 
relType);
@@ -519,17 +538,18 @@ class MergeTableLikeUtil {
             }
         }
 
-        public TableSchema build() {
-            TableSchema.Builder resultBuilder = TableSchema.builder();
-            for (TableColumn column : columns.values()) {
-                resultBuilder.add(column);
-            }
-            for (WatermarkSpec watermarkSpec : watermarkSpecs.values()) {
-                resultBuilder.watermark(watermarkSpec);
+        public Schema build() {
+            Schema.Builder resultBuilder = Schema.newBuilder();
+            resultBuilder.fromColumns(new ArrayList<>(columns.values()));
+
+            for (UnresolvedWatermarkSpec watermarkSpec : 
watermarkSpecs.values()) {
+                resultBuilder.watermark(
+                        watermarkSpec.getColumnName(), 
watermarkSpec.getWatermarkExpression());
             }
             if (primaryKey != null) {
-                resultBuilder.primaryKey(
-                        primaryKey.getName(), 
primaryKey.getColumns().toArray(new String[0]));
+                resultBuilder.primaryKeyNamed(
+                        primaryKey.getConstraintName(),
+                        primaryKey.getColumnNames().toArray(new String[0]));
             }
             return resultBuilder.build();
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 5d878341577..4bba017db5e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -21,13 +21,11 @@ package org.apache.flink.table.planner.operations;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
 import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
-import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableLike;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -44,14 +42,13 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.util.NlsString;
-import org.apache.commons.lang3.StringUtils;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -65,7 +62,9 @@ class SqlCreateTableConverter {
             FlinkCalciteSqlValidator sqlValidator,
             CatalogManager catalogManager,
             Function<SqlNode, String> escapeExpression) {
-        this.mergeTableLikeUtil = new MergeTableLikeUtil(sqlValidator, 
escapeExpression);
+        this.mergeTableLikeUtil =
+                new MergeTableLikeUtil(
+                        sqlValidator, escapeExpression, 
catalogManager.getDataTypeFactory());
         this.catalogManager = catalogManager;
     }
 
@@ -124,22 +123,19 @@ class SqlCreateTableConverter {
 
     private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) {
 
-        final TableSchema sourceTableSchema;
+        final Schema sourceTableSchema;
         final List<String> sourcePartitionKeys;
         final List<SqlTableLike.SqlTableLikeOption> likeOptions;
         final Map<String, String> sourceProperties;
         if (sqlCreateTable instanceof SqlCreateTableLike) {
             SqlTableLike sqlTableLike = ((SqlCreateTableLike) 
sqlCreateTable).getTableLike();
             CatalogTable table = lookupLikeSourceTable(sqlTableLike);
-            sourceTableSchema =
-                    TableSchema.fromResolvedSchema(
-                            table.getUnresolvedSchema()
-                                    
.resolve(catalogManager.getSchemaResolver()));
+            sourceTableSchema = table.getUnresolvedSchema();
             sourcePartitionKeys = table.getPartitionKeys();
             likeOptions = sqlTableLike.getOptions();
             sourceProperties = table.getOptions();
         } else {
-            sourceTableSchema = TableSchema.builder().build();
+            sourceTableSchema = Schema.newBuilder().build();
             sourcePartitionKeys = Collections.emptyList();
             likeOptions = Collections.emptyList();
             sourceProperties = Collections.emptyMap();
@@ -155,19 +151,8 @@ class SqlCreateTableConverter {
                 sqlCreateTable.getFullConstraints().stream()
                         .filter(SqlTableConstraint::isPrimaryKey)
                         .findAny();
-        List<SqlNode> columns = sqlCreateTable.getColumnList().getList();
-        Map<String, String> comments =
-                columns.stream()
-                        .map(col -> (SqlTableColumn) col)
-                        .filter(col -> col.getComment().isPresent())
-                        .collect(
-                                Collectors.toMap(
-                                        col -> col.getName().getSimple(),
-                                        col ->
-                                                StringUtils.strip(
-                                                        
col.getComment().get().toString(), "'")));
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 mergeTableLikeUtil.mergeTables(
                         mergingStrategies,
                         sourceTableSchema,
@@ -193,10 +178,7 @@ class SqlCreateTableConverter {
 
         return catalogManager.resolveCatalogTable(
                 CatalogTable.of(
-                        mergedSchema.toSchema(comments),
-                        tableComment,
-                        partitionKeys,
-                        new HashMap<>(mergedOptions)));
+                        mergedSchema, tableComment, partitionKeys, new 
HashMap<>(mergedOptions)));
     }
 
     private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) {
@@ -224,15 +206,18 @@ class SqlCreateTableConverter {
         return lookupResult.getResolvedTable();
     }
 
-    private void verifyPartitioningColumnsExist(
-            TableSchema mergedSchema, List<String> partitionKeys) {
+    private void verifyPartitioningColumnsExist(Schema mergedSchema, 
List<String> partitionKeys) {
+        Set<String> columnNames =
+                mergedSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toSet());
         for (String partitionKey : partitionKeys) {
-            if (!mergedSchema.getTableColumn(partitionKey).isPresent()) {
+            if (!columnNames.contains(partitionKey)) {
                 throw new ValidationException(
                         String.format(
                                 "Partition column '%s' not defined in the 
table schema. Available columns: [%s]",
                                 partitionKey,
-                                Arrays.stream(mergedSchema.getFieldNames())
+                                columnNames.stream()
                                         .collect(Collectors.joining("', '", 
"'", "'"))));
             }
         }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
index e64f970db97..7c5399d864d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
@@ -29,14 +29,15 @@ import 
org.apache.flink.sql.parser.ddl.constraint.SqlConstraintEnforcement;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.ddl.constraint.SqlUniqueSpec;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
 import org.apache.flink.table.planner.utils.PlannerMocks;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
 
 import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.sql.SqlBasicCall;
@@ -69,14 +70,16 @@ public class MergeTableLikeUtilTest {
                     Thread.currentThread().getContextClassLoader(), 
FlinkTypeSystem.INSTANCE);
     private final SqlValidator sqlValidator =
             PlannerMocks.create().getPlanner().getOrCreateSqlValidator();
-    private final MergeTableLikeUtil util = new 
MergeTableLikeUtil(sqlValidator, SqlNode::toString);
+    private final DataTypeFactory dataTypeFactory = new DataTypeFactoryMock();
+    private final MergeTableLikeUtil util =
+            new MergeTableLikeUtil(sqlValidator, SqlNode::toString, 
dataTypeFactory);
 
     @Test
     public void mergePhysicalColumns() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.physical("two", DataTypes.STRING()))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .column("two", DataTypes.STRING())
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -84,7 +87,7 @@ public class MergeTableLikeUtilTest {
                         regularColumn("three", DataTypes.INT()),
                         regularColumn("four", DataTypes.STRING()));
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         getDefaultMergingStrategies(),
                         sourceSchema,
@@ -92,12 +95,12 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.physical("two", DataTypes.STRING()))
-                        .add(TableColumn.physical("three", DataTypes.INT()))
-                        .add(TableColumn.physical("four", DataTypes.STRING()))
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .column("two", DataTypes.STRING())
+                        .column("three", DataTypes.INT())
+                        .column("four", DataTypes.STRING())
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -105,8 +108,7 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeWithIncludeFailsOnDuplicateColumn() {
-        TableSchema sourceSchema =
-                TableSchema.builder().add(TableColumn.physical("one", 
DataTypes.INT())).build();
+        Schema sourceSchema = Schema.newBuilder().column("one", 
DataTypes.INT()).build();
 
         List<SqlNode> derivedColumns =
                 Arrays.asList(
@@ -127,8 +129,7 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeWithIncludeFailsOnDuplicateRegularColumn() {
-        TableSchema sourceSchema =
-                TableSchema.builder().add(TableColumn.physical("one", 
DataTypes.INT())).build();
+        Schema sourceSchema = Schema.newBuilder().column("one", 
DataTypes.INT()).build();
 
         List<SqlNode> derivedColumns =
                 Arrays.asList(
@@ -150,8 +151,7 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void 
mergeWithIncludeFailsOnDuplicateRegularColumnAndComputeColumn() {
-        TableSchema sourceSchema =
-                TableSchema.builder().add(TableColumn.physical("one", 
DataTypes.INT())).build();
+        Schema sourceSchema = Schema.newBuilder().column("one", 
DataTypes.INT()).build();
 
         List<SqlNode> derivedColumns =
                 Arrays.asList(
@@ -176,8 +176,7 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void 
mergeWithIncludeFailsOnDuplicateRegularColumnAndMetadataColumn() {
-        TableSchema sourceSchema =
-                TableSchema.builder().add(TableColumn.physical("one", 
DataTypes.INT())).build();
+        Schema sourceSchema = Schema.newBuilder().column("one", 
DataTypes.INT()).build();
 
         List<SqlNode> derivedColumns =
                 Arrays.asList(
@@ -202,10 +201,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeGeneratedColumns() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), "one 
+ 1"))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "one + 1")
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -213,7 +212,7 @@ public class MergeTableLikeUtilTest {
                         regularColumn("three", DataTypes.INT()),
                         computedColumn("four", plus("one", "3")));
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         getDefaultMergingStrategies(),
                         sourceSchema,
@@ -221,12 +220,12 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), "one 
+ 1"))
-                        .add(TableColumn.physical("three", DataTypes.INT()))
-                        .add(TableColumn.computed("four", DataTypes.INT(), 
"`one` + 3"))
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "one + 1")
+                        .column("three", DataTypes.INT())
+                        .columnByExpression("four", "`one` + 3")
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -234,11 +233,11 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeMetadataColumns() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.metadata("two", DataTypes.INT(), 
false))
-                        .add(TableColumn.computed("c", DataTypes.INT(), 
"ABS(two)"))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByMetadata("two", DataTypes.INT(), false)
+                        .columnByExpression("c", "ABS(two)")
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -246,7 +245,7 @@ public class MergeTableLikeUtilTest {
                         regularColumn("three", DataTypes.INT()),
                         metadataColumn("four", DataTypes.INT(), true));
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         getDefaultMergingStrategies(),
                         sourceSchema,
@@ -254,13 +253,13 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.metadata("two", DataTypes.INT(), 
false))
-                        .add(TableColumn.computed("c", DataTypes.INT(), 
"ABS(two)"))
-                        .add(TableColumn.physical("three", DataTypes.INT()))
-                        .add(TableColumn.metadata("four", DataTypes.INT(), 
true))
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByMetadata("two", DataTypes.INT(), false)
+                        .columnByExpression("c", "ABS(two)")
+                        .column("three", DataTypes.INT())
+                        .columnByMetadata("four", DataTypes.INT(), true)
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -268,10 +267,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeIncludingGeneratedColumnsFailsOnDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), "one 
+ 1"))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "one + 1")
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -294,10 +293,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeIncludingMetadataColumnsFailsOnDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.metadata("two", DataTypes.INT()))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByMetadata("two", DataTypes.INT())
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -320,10 +319,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeExcludingGeneratedColumnsDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), "one 
+ 1"))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "one + 1")
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -332,7 +331,7 @@ public class MergeTableLikeUtilTest {
         Map<FeatureOption, MergingStrategy> mergingStrategies = 
getDefaultMergingStrategies();
         mergingStrategies.put(FeatureOption.GENERATED, 
MergingStrategy.EXCLUDING);
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         mergingStrategies,
                         sourceSchema,
@@ -340,10 +339,10 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), 
"`one` + 3"))
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "`one` + 3")
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -351,10 +350,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeExcludingMetadataColumnsDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.metadata("two", DataTypes.INT()))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByMetadata("two", DataTypes.INT())
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -363,7 +362,7 @@ public class MergeTableLikeUtilTest {
         Map<FeatureOption, MergingStrategy> mergingStrategies = 
getDefaultMergingStrategies();
         mergingStrategies.put(FeatureOption.METADATA, 
MergingStrategy.EXCLUDING);
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         mergingStrategies,
                         sourceSchema,
@@ -371,10 +370,10 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.metadata("two", DataTypes.BOOLEAN()))
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByMetadata("two", DataTypes.BOOLEAN())
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -382,10 +381,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeOverwritingGeneratedColumnsDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), "one 
+ 1"))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "one + 1")
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -394,7 +393,7 @@ public class MergeTableLikeUtilTest {
         Map<FeatureOption, MergingStrategy> mergingStrategies = 
getDefaultMergingStrategies();
         mergingStrategies.put(FeatureOption.GENERATED, 
MergingStrategy.OVERWRITING);
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         mergingStrategies,
                         sourceSchema,
@@ -402,10 +401,10 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), 
"`one` + 3"))
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "`one` + 3")
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -413,10 +412,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeOverwritingMetadataColumnsDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.metadata("two", DataTypes.INT()))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByMetadata("two", DataTypes.INT())
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -425,7 +424,7 @@ public class MergeTableLikeUtilTest {
         Map<FeatureOption, MergingStrategy> mergingStrategies = 
getDefaultMergingStrategies();
         mergingStrategies.put(FeatureOption.METADATA, 
MergingStrategy.OVERWRITING);
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         mergingStrategies,
                         sourceSchema,
@@ -433,10 +432,10 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.metadata("two", DataTypes.BOOLEAN(), 
true))
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByMetadata("two", DataTypes.BOOLEAN(), true)
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -444,10 +443,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeOverwritingPhysicalColumnWithGeneratedColumn() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.physical("two", DataTypes.INT()))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .column("two", DataTypes.INT())
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -473,10 +472,10 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeOverwritingComputedColumnWithMetadataColumn() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), "one 
+ 3"))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "one + 3")
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -501,15 +500,12 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeWatermarks() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), "one 
+ 1"))
-                        .add(TableColumn.physical("timestamp", 
DataTypes.TIMESTAMP()))
-                        .watermark(
-                                "timestamp",
-                                "timestamp - INTERVAL '5' SECOND",
-                                DataTypes.TIMESTAMP())
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "one +1")
+                        .column("timestamp", DataTypes.TIMESTAMP())
+                        .watermark("timestamp", "timestamp - INTERVAL '5' 
SECOND")
                         .build();
 
         List<SqlNode> derivedColumns =
@@ -517,7 +513,7 @@ public class MergeTableLikeUtilTest {
                         regularColumn("three", DataTypes.INT()),
                         computedColumn("four", plus("one", "3")));
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         getDefaultMergingStrategies(),
                         sourceSchema,
@@ -525,17 +521,14 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.computed("two", DataTypes.INT(), "one 
+ 1"))
-                        .add(TableColumn.physical("timestamp", 
DataTypes.TIMESTAMP()))
-                        .watermark(
-                                "timestamp",
-                                "timestamp - INTERVAL '5' SECOND",
-                                DataTypes.TIMESTAMP())
-                        .add(TableColumn.physical("three", DataTypes.INT()))
-                        .add(TableColumn.computed("four", DataTypes.INT(), 
"`one` + 3"))
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .columnByExpression("two", "one +1")
+                        .column("timestamp", DataTypes.TIMESTAMP())
+                        .watermark("timestamp", "timestamp - INTERVAL '5' 
SECOND")
+                        .column("three", DataTypes.INT())
+                        .columnByExpression("four", "`one` + 3")
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -543,14 +536,11 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeIncludingWatermarksFailsOnDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.physical("timestamp", 
DataTypes.TIMESTAMP()))
-                        .watermark(
-                                "timestamp",
-                                "timestamp - INTERVAL '5' SECOND",
-                                DataTypes.TIMESTAMP())
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .column("timestamp", DataTypes.TIMESTAMP())
+                        .watermark("timestamp", "timestamp - INTERVAL '5' 
SECOND")
                         .build();
 
         List<SqlWatermark> derivedWatermarkSpecs =
@@ -577,14 +567,11 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeExcludingWatermarksDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.physical("timestamp", 
DataTypes.TIMESTAMP()))
-                        .watermark(
-                                "timestamp",
-                                "timestamp - INTERVAL '5' SECOND",
-                                DataTypes.TIMESTAMP())
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .column("timestamp", DataTypes.TIMESTAMP())
+                        .watermark("timestamp", "timestamp - INTERVAL '5' 
SECOND")
                         .build();
 
         List<SqlWatermark> derivedWatermarkSpecs =
@@ -597,7 +584,7 @@ public class MergeTableLikeUtilTest {
         Map<FeatureOption, MergingStrategy> mergingStrategies = 
getDefaultMergingStrategies();
         mergingStrategies.put(FeatureOption.WATERMARKS, 
MergingStrategy.EXCLUDING);
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         mergingStrategies,
                         sourceSchema,
@@ -605,14 +592,11 @@ public class MergeTableLikeUtilTest {
                         derivedWatermarkSpecs,
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.physical("timestamp", 
DataTypes.TIMESTAMP()))
-                        .watermark(
-                                "timestamp",
-                                "`timestamp` - INTERVAL '10' SECOND",
-                                DataTypes.TIMESTAMP())
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .column("timestamp", DataTypes.TIMESTAMP())
+                        .watermark("timestamp", "`timestamp` - INTERVAL '10' 
SECOND")
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -620,14 +604,11 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeOverwritingWatermarksDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.physical("timestamp", 
DataTypes.TIMESTAMP()))
-                        .watermark(
-                                "timestamp",
-                                "timestamp - INTERVAL '5' SECOND",
-                                DataTypes.TIMESTAMP())
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .column("timestamp", DataTypes.TIMESTAMP())
+                        .watermark("timestamp", "timestamp - INTERVAL '5' 
SECOND")
                         .build();
 
         List<SqlWatermark> derivedWatermarkSpecs =
@@ -640,7 +621,7 @@ public class MergeTableLikeUtilTest {
         Map<FeatureOption, MergingStrategy> mergingStrategies = 
getDefaultMergingStrategies();
         mergingStrategies.put(FeatureOption.WATERMARKS, 
MergingStrategy.OVERWRITING);
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         mergingStrategies,
                         sourceSchema,
@@ -648,14 +629,11 @@ public class MergeTableLikeUtilTest {
                         derivedWatermarkSpecs,
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", DataTypes.INT()))
-                        .add(TableColumn.physical("timestamp", 
DataTypes.TIMESTAMP()))
-                        .watermark(
-                                "timestamp",
-                                "`timestamp` - INTERVAL '10' SECOND",
-                                DataTypes.TIMESTAMP())
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT())
+                        .column("timestamp", DataTypes.TIMESTAMP())
+                        .watermark("timestamp", "`timestamp` - INTERVAL '10' 
SECOND")
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -663,15 +641,15 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeConstraintsFromBaseTable() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", 
DataTypes.INT().notNull()))
-                        .add(TableColumn.physical("two", 
DataTypes.STRING().notNull()))
-                        .add(TableColumn.physical("three", DataTypes.FLOAT()))
-                        .primaryKey("constraint-42", new String[] {"one", 
"two"})
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT().notNull())
+                        .column("two", DataTypes.STRING().notNull())
+                        .column("three", DataTypes.FLOAT())
+                        .primaryKeyNamed("constraint-42", new String[] {"one", 
"two"})
                         .build();
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         getDefaultMergingStrategies(),
                         sourceSchema,
@@ -679,12 +657,12 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         null);
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", 
DataTypes.INT().notNull()))
-                        .add(TableColumn.physical("two", 
DataTypes.STRING().notNull()))
-                        .add(TableColumn.physical("three", DataTypes.FLOAT()))
-                        .primaryKey("constraint-42", new String[] {"one", 
"two"})
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT().notNull())
+                        .column("two", DataTypes.STRING().notNull())
+                        .column("three", DataTypes.FLOAT())
+                        .primaryKeyNamed("constraint-42", new String[] {"one", 
"two"})
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -692,14 +670,14 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeConstraintsFromDerivedTable() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", 
DataTypes.INT().notNull()))
-                        .add(TableColumn.physical("two", 
DataTypes.STRING().notNull()))
-                        .add(TableColumn.physical("three", DataTypes.FLOAT()))
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT().notNull())
+                        .column("two", DataTypes.STRING().notNull())
+                        .column("three", DataTypes.FLOAT())
                         .build();
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         getDefaultMergingStrategies(),
                         sourceSchema,
@@ -707,12 +685,12 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         primaryKey("one", "two"));
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", 
DataTypes.INT().notNull()))
-                        .add(TableColumn.physical("two", 
DataTypes.STRING().notNull()))
-                        .add(TableColumn.physical("three", DataTypes.FLOAT()))
-                        .primaryKey("PK_3531879", new String[] {"one", "two"})
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT().notNull())
+                        .column("two", DataTypes.STRING().notNull())
+                        .column("three", DataTypes.FLOAT())
+                        .primaryKeyNamed("PK_3531879", new String[] {"one", 
"two"})
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
@@ -720,12 +698,12 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeIncludingConstraintsFailsOnDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", 
DataTypes.INT().notNull()))
-                        .add(TableColumn.physical("two", 
DataTypes.STRING().notNull()))
-                        .add(TableColumn.physical("three", DataTypes.FLOAT()))
-                        .primaryKey("constraint-42", new String[] {"one", 
"two"})
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT().notNull())
+                        .column("two", DataTypes.STRING().notNull())
+                        .column("three", DataTypes.FLOAT())
+                        .primaryKeyNamed("constraint-42", new String[] {"one", 
"two"})
                         .build();
 
         assertThatThrownBy(
@@ -744,18 +722,18 @@ public class MergeTableLikeUtilTest {
 
     @Test
     public void mergeExcludingConstraintsOnDuplicate() {
-        TableSchema sourceSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", 
DataTypes.INT().notNull()))
-                        .add(TableColumn.physical("two", 
DataTypes.STRING().notNull()))
-                        .add(TableColumn.physical("three", 
DataTypes.FLOAT().notNull()))
-                        .primaryKey("constraint-42", new String[] {"one", 
"two", "three"})
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT().notNull())
+                        .column("two", DataTypes.STRING().notNull())
+                        .column("three", DataTypes.FLOAT())
+                        .primaryKeyNamed("constraint-42", new String[] {"one", 
"two", "three"})
                         .build();
 
         Map<FeatureOption, MergingStrategy> mergingStrategies = 
getDefaultMergingStrategies();
         mergingStrategies.put(FeatureOption.CONSTRAINTS, 
MergingStrategy.EXCLUDING);
 
-        TableSchema mergedSchema =
+        Schema mergedSchema =
                 util.mergeTables(
                         mergingStrategies,
                         sourceSchema,
@@ -763,12 +741,12 @@ public class MergeTableLikeUtilTest {
                         Collections.emptyList(),
                         primaryKey("one", "two"));
 
-        TableSchema expectedSchema =
-                TableSchema.builder()
-                        .add(TableColumn.physical("one", 
DataTypes.INT().notNull()))
-                        .add(TableColumn.physical("two", 
DataTypes.STRING().notNull()))
-                        .add(TableColumn.physical("three", 
DataTypes.FLOAT().notNull()))
-                        .primaryKey("PK_3531879", new String[] {"one", "two"})
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("one", DataTypes.INT().notNull())
+                        .column("two", DataTypes.STRING().notNull())
+                        .column("three", DataTypes.FLOAT())
+                        .primaryKeyNamed("PK_3531879", new String[] {"one", 
"two"})
                         .build();
 
         assertThat(mergedSchema).isEqualTo(expectedSchema);
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 8786641ec6f..d228c206aa5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -740,8 +740,10 @@ class TableScanTest extends TableTestBase {
   def testInvalidWatermarkOutputType(): Unit = {
     thrown.expect(classOf[ValidationException])
     thrown.expectMessage(
-      "Watermark strategy '' must be of type TIMESTAMP or TIMESTAMP_LTZ" +
-        " but is of type 'CHAR(0) NOT NULL'.")
+      "Invalid data type of expression for watermark definition. " +
+        "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), " +
+        "the supported precision 'p' is from 0 to 3, but the watermark " +
+        "expression type is CHAR(0) NOT NULL")
     util.addTable("""
                     |CREATE TABLE src (
                     |  ts TIMESTAMP(3),

Reply via email to