This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d4a9a749310f33618b942dd0258855642001e0d3 Author: Jark Wu <[email protected]> AuthorDate: Sat Mar 11 00:07:42 2023 +0800 [FLINK-25347][table] Replace deprecated TableSchema with Schema in MergeTableLikeUtil This closes #22158 --- .../planner/operations/MergeTableLikeUtil.java | 130 ++++--- .../operations/SqlCreateTableConverter.java | 47 +-- .../planner/operations/MergeTableLikeUtilTest.java | 388 ++++++++++----------- .../planner/plan/stream/sql/TableScanTest.scala | 6 +- 4 files changed, 278 insertions(+), 293 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java index 29c70592825..6cdab69b427 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java @@ -28,16 +28,17 @@ import org.apache.flink.sql.parser.ddl.SqlTableLike.FeatureOption; import org.apache.flink.sql.parser.ddl.SqlTableLike.MergingStrategy; import org.apache.flink.sql.parser.ddl.SqlWatermark; import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableColumn.ComputedColumn; -import org.apache.flink.table.api.TableColumn.MetadataColumn; -import org.apache.flink.table.api.TableColumn.PhysicalColumn; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Schema.UnresolvedColumn; +import org.apache.flink.table.api.Schema.UnresolvedComputedColumn; +import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn; +import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn; +import org.apache.flink.table.api.Schema.UnresolvedPrimaryKey; +import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.WatermarkSpec; -import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.expressions.SqlCallExpression; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.utils.TypeConversions; @@ -45,6 +46,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.validate.SqlValidator; @@ -79,10 +81,15 @@ class MergeTableLikeUtil { private final SqlValidator validator; private final Function<SqlNode, String> escapeExpression; + private final DataTypeFactory dataTypeFactory; - MergeTableLikeUtil(SqlValidator validator, Function<SqlNode, String> escapeExpression) { + MergeTableLikeUtil( + SqlValidator validator, + Function<SqlNode, String> escapeExpression, + DataTypeFactory dataTypeFactory) { this.validator = validator; this.escapeExpression = escapeExpression; + this.dataTypeFactory = dataTypeFactory; } /** @@ -133,9 +140,9 @@ class MergeTableLikeUtil { * of the merged properties. E.g. Some of the columns used in computed columns of the derived * table can be defined in the source table. */ - public TableSchema mergeTables( + public Schema mergeTables( Map<FeatureOption, MergingStrategy> mergingStrategies, - TableSchema sourceSchema, + Schema sourceSchema, List<SqlNode> derivedColumns, List<SqlWatermark> derivedWatermarkSpecs, SqlTableConstraint derivedPrimaryKey) { @@ -145,6 +152,7 @@ class MergeTableLikeUtil { mergingStrategies, sourceSchema, (FlinkTypeFactory) validator.getTypeFactory(), + dataTypeFactory, validator, escapeExpression); schemaBuilder.appendDerivedColumns(mergingStrategies, derivedColumns); @@ -208,9 +216,9 @@ class MergeTableLikeUtil { private static class SchemaBuilder { - Map<String, TableColumn> columns = new LinkedHashMap<>(); - Map<String, WatermarkSpec> watermarkSpecs = new HashMap<>(); - UniqueConstraint primaryKey = null; + Map<String, UnresolvedColumn> columns = new LinkedHashMap<>(); + Map<String, UnresolvedWatermarkSpec> watermarkSpecs = new HashMap<>(); + UnresolvedPrimaryKey primaryKey = null; // Intermediate state Map<String, RelDataType> physicalFieldNamesToTypes = new LinkedHashMap<>(); @@ -220,14 +228,17 @@ class MergeTableLikeUtil { Function<SqlNode, String> escapeExpressions; FlinkTypeFactory typeFactory; SqlValidator sqlValidator; + DataTypeFactory dataTypeFactory; SchemaBuilder( Map<FeatureOption, MergingStrategy> mergingStrategies, - TableSchema sourceSchema, + Schema sourceSchema, FlinkTypeFactory typeFactory, + DataTypeFactory dataTypeFactory, SqlValidator sqlValidator, Function<SqlNode, String> escapeExpressions) { this.typeFactory = typeFactory; + this.dataTypeFactory = dataTypeFactory; this.sqlValidator = sqlValidator; this.escapeExpressions = escapeExpressions; populateColumnsFromSourceTable(mergingStrategies, sourceSchema); @@ -236,20 +247,24 @@ class MergeTableLikeUtil { } private void populateColumnsFromSourceTable( - Map<FeatureOption, MergingStrategy> mergingStrategies, TableSchema sourceSchema) { - for (TableColumn sourceColumn : sourceSchema.getTableColumns()) { - if (sourceColumn instanceof PhysicalColumn) { + Map<FeatureOption, MergingStrategy> mergingStrategies, Schema sourceSchema) { + for (UnresolvedColumn sourceColumn : sourceSchema.getColumns()) { + if (sourceColumn instanceof UnresolvedPhysicalColumn) { + LogicalType columnType = + dataTypeFactory + .createDataType( + ((UnresolvedPhysicalColumn) sourceColumn).getDataType()) + .getLogicalType(); physicalFieldNamesToTypes.put( sourceColumn.getName(), - typeFactory.createFieldTypeFromLogicalType( - sourceColumn.getType().getLogicalType())); + typeFactory.createFieldTypeFromLogicalType(columnType)); columns.put(sourceColumn.getName(), sourceColumn); - } else if (sourceColumn instanceof ComputedColumn) { + } else if (sourceColumn instanceof UnresolvedComputedColumn) { if (mergingStrategies.get(FeatureOption.GENERATED) != MergingStrategy.EXCLUDING) { columns.put(sourceColumn.getName(), sourceColumn); } - } else if (sourceColumn instanceof MetadataColumn) { + } else if (sourceColumn instanceof UnresolvedMetadataColumn) { if (mergingStrategies.get(FeatureOption.METADATA) != MergingStrategy.EXCLUDING) { columns.put(sourceColumn.getName(), sourceColumn); @@ -259,17 +274,16 @@ class MergeTableLikeUtil { } private void populateWatermarksFromSourceTable( - Map<FeatureOption, MergingStrategy> mergingStrategies, TableSchema sourceSchema) { - for (WatermarkSpec sourceWatermarkSpec : sourceSchema.getWatermarkSpecs()) { + Map<FeatureOption, MergingStrategy> mergingStrategies, Schema sourceSchema) { + for (UnresolvedWatermarkSpec sourceWatermarkSpec : sourceSchema.getWatermarkSpecs()) { if (mergingStrategies.get(FeatureOption.WATERMARKS) != MergingStrategy.EXCLUDING) { - watermarkSpecs.put( - sourceWatermarkSpec.getRowtimeAttribute(), sourceWatermarkSpec); + watermarkSpecs.put(sourceWatermarkSpec.getColumnName(), sourceWatermarkSpec); } } } private void populatePrimaryKeyFromSourceTable( - Map<FeatureOption, MergingStrategy> mergingStrategies, TableSchema sourceSchema) { + Map<FeatureOption, MergingStrategy> mergingStrategies, Schema sourceSchema) { if (sourceSchema.getPrimaryKey().isPresent() && mergingStrategies.get(FeatureOption.CONSTRAINTS) == MergingStrategy.INCLUDING) { @@ -292,7 +306,7 @@ class MergeTableLikeUtil { "Primary key column '%s' is not defined in the schema at %s", primaryKey, primaryKeyNode.getParserPosition())); } - if (!columns.get(primaryKey).isPhysical()) { + if (!(columns.get(primaryKey) instanceof UnresolvedPhysicalColumn)) { throw new ValidationException( String.format( "Could not create a PRIMARY KEY with column '%s' at %s.\n" @@ -302,7 +316,7 @@ class MergeTableLikeUtil { primaryKeyColumns.add(primaryKey); } primaryKey = - UniqueConstraint.primaryKey( + new UnresolvedPrimaryKey( derivedPrimaryKey .getConstraintName() .orElseGet(() -> "PK_" + primaryKeyColumns.hashCode()), @@ -329,15 +343,12 @@ class MergeTableLikeUtil { // this will validate and expand function identifiers. SqlNode validated = sqlValidator.validateParameterizedExpression(expression, nameToTypeMap); - RelDataType validatedType = sqlValidator.getValidatedNodeType(validated); - DataType exprDataType = fromLogicalToDataType(toLogicalType(validatedType)); watermarkSpecs.put( rowtimeAttribute, - new WatermarkSpec( + new UnresolvedWatermarkSpec( rowtimeAttribute, - escapeExpressions.apply(validated), - exprDataType)); + new SqlCallExpression(escapeExpressions.apply(validated)))); } } @@ -398,13 +409,20 @@ class MergeTableLikeUtil { for (SqlNode derivedColumn : derivedColumns) { final String name = ((SqlTableColumn) derivedColumn).getName().getSimple(); - final TableColumn column; + final String comment = + ((SqlTableColumn) derivedColumn) + .getComment() + .map(c -> ((SqlLiteral) c).getValueAs(String.class)) + .orElse(null); + final UnresolvedColumn column; if (derivedColumn instanceof SqlRegularColumn) { final LogicalType logicalType = FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name)); column = - TableColumn.physical( - name, TypeConversions.fromLogicalToDataType(logicalType)); + new UnresolvedPhysicalColumn( + name, + TypeConversions.fromLogicalToDataType(logicalType), + comment); } else if (derivedColumn instanceof SqlComputedColumn) { final SqlComputedColumn computedColumn = (SqlComputedColumn) derivedColumn; if (physicalFieldNamesToTypes.containsKey(name)) { @@ -415,7 +433,7 @@ class MergeTableLikeUtil { name)); } if (columns.containsKey(name)) { - if (!(columns.get(name) instanceof ComputedColumn)) { + if (!(columns.get(name) instanceof UnresolvedComputedColumn)) { throw new ValidationException( String.format( "A column named '%s' already exists in the base table. " @@ -443,10 +461,10 @@ class MergeTableLikeUtil { final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr); column = - TableColumn.computed( + new UnresolvedComputedColumn( name, - fromLogicalToDataType(toLogicalType(validatedType)), - escapeExpressions.apply(validatedExpr)); + new SqlCallExpression(escapeExpressions.apply(validatedExpr)), + comment); computedFieldNamesToTypes.put(name, validatedType); } else if (derivedColumn instanceof SqlMetadataColumn) { final SqlMetadataColumn metadataColumn = (SqlMetadataColumn) derivedColumn; @@ -458,7 +476,7 @@ class MergeTableLikeUtil { name)); } if (columns.containsKey(name)) { - if (!(columns.get(name) instanceof MetadataColumn)) { + if (!(columns.get(name) instanceof UnresolvedMetadataColumn)) { throw new ValidationException( String.format( "A column named '%s' already exists in the base table. " @@ -477,14 +495,15 @@ class MergeTableLikeUtil { } SqlDataTypeSpec type = metadataColumn.getType(); - boolean nullable = type.getNullable() == null ? true : type.getNullable(); + boolean nullable = type.getNullable() == null || type.getNullable(); RelDataType relType = type.deriveType(sqlValidator, nullable); column = - TableColumn.metadata( + new UnresolvedMetadataColumn( name, fromLogicalToDataType(toLogicalType(relType)), metadataColumn.getMetadataAlias().orElse(null), - metadataColumn.isVirtual()); + metadataColumn.isVirtual(), + comment); metadataFieldNamesToTypes.put(name, relType); } else { throw new ValidationException("Unsupported column type: " + derivedColumn); @@ -505,7 +524,7 @@ class MergeTableLikeUtil { name)); } SqlDataTypeSpec type = regularColumn.getType(); - boolean nullable = type.getNullable() == null ? true : type.getNullable(); + boolean nullable = type.getNullable() == null || type.getNullable(); RelDataType relType = type.deriveType(sqlValidator, nullable); // add field name and field type to physical field list RelDataType oldType = physicalFieldNamesToTypes.put(name, relType); @@ -519,17 +538,18 @@ class MergeTableLikeUtil { } } - public TableSchema build() { - TableSchema.Builder resultBuilder = TableSchema.builder(); - for (TableColumn column : columns.values()) { - resultBuilder.add(column); - } - for (WatermarkSpec watermarkSpec : watermarkSpecs.values()) { - resultBuilder.watermark(watermarkSpec); + public Schema build() { + Schema.Builder resultBuilder = Schema.newBuilder(); + resultBuilder.fromColumns(new ArrayList<>(columns.values())); + + for (UnresolvedWatermarkSpec watermarkSpec : watermarkSpecs.values()) { + resultBuilder.watermark( + watermarkSpec.getColumnName(), watermarkSpec.getWatermarkExpression()); } if (primaryKey != null) { - resultBuilder.primaryKey( - primaryKey.getName(), primaryKey.getColumns().toArray(new String[0])); + resultBuilder.primaryKeyNamed( + primaryKey.getConstraintName(), + primaryKey.getColumnNames().toArray(new String[0])); } return resultBuilder.build(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java index 5d878341577..4bba017db5e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java @@ -21,13 +21,11 @@ package org.apache.flink.table.planner.operations; import org.apache.flink.sql.parser.ddl.SqlCreateTable; import org.apache.flink.sql.parser.ddl.SqlCreateTableAs; import org.apache.flink.sql.parser.ddl.SqlCreateTableLike; -import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.sql.parser.ddl.SqlTableLike; import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogTable; @@ -44,14 +42,13 @@ import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.util.NlsString; -import org.apache.commons.lang3.StringUtils; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -65,7 +62,9 @@ class SqlCreateTableConverter { FlinkCalciteSqlValidator sqlValidator, CatalogManager catalogManager, Function<SqlNode, String> escapeExpression) { - this.mergeTableLikeUtil = new MergeTableLikeUtil(sqlValidator, escapeExpression); + this.mergeTableLikeUtil = + new MergeTableLikeUtil( + sqlValidator, escapeExpression, catalogManager.getDataTypeFactory()); this.catalogManager = catalogManager; } @@ -124,22 +123,19 @@ class SqlCreateTableConverter { private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) { - final TableSchema sourceTableSchema; + final Schema sourceTableSchema; final List<String> sourcePartitionKeys; final List<SqlTableLike.SqlTableLikeOption> likeOptions; final Map<String, String> sourceProperties; if (sqlCreateTable instanceof SqlCreateTableLike) { SqlTableLike sqlTableLike = ((SqlCreateTableLike) sqlCreateTable).getTableLike(); CatalogTable table = lookupLikeSourceTable(sqlTableLike); - sourceTableSchema = - TableSchema.fromResolvedSchema( - table.getUnresolvedSchema() - .resolve(catalogManager.getSchemaResolver())); + sourceTableSchema = table.getUnresolvedSchema(); sourcePartitionKeys = table.getPartitionKeys(); likeOptions = sqlTableLike.getOptions(); sourceProperties = table.getOptions(); } else { - sourceTableSchema = TableSchema.builder().build(); + sourceTableSchema = Schema.newBuilder().build(); sourcePartitionKeys = Collections.emptyList(); likeOptions = Collections.emptyList(); sourceProperties = Collections.emptyMap(); @@ -155,19 +151,8 @@ class SqlCreateTableConverter { sqlCreateTable.getFullConstraints().stream() .filter(SqlTableConstraint::isPrimaryKey) .findAny(); - List<SqlNode> columns = sqlCreateTable.getColumnList().getList(); - Map<String, String> comments = - columns.stream() - .map(col -> (SqlTableColumn) col) - .filter(col -> col.getComment().isPresent()) - .collect( - Collectors.toMap( - col -> col.getName().getSimple(), - col -> - StringUtils.strip( - col.getComment().get().toString(), "'"))); - TableSchema mergedSchema = + Schema mergedSchema = mergeTableLikeUtil.mergeTables( mergingStrategies, sourceTableSchema, @@ -193,10 +178,7 @@ class SqlCreateTableConverter { return catalogManager.resolveCatalogTable( CatalogTable.of( - mergedSchema.toSchema(comments), - tableComment, - partitionKeys, - new HashMap<>(mergedOptions))); + mergedSchema, tableComment, partitionKeys, new HashMap<>(mergedOptions))); } private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) { @@ -224,15 +206,18 @@ class SqlCreateTableConverter { return lookupResult.getResolvedTable(); } - private void verifyPartitioningColumnsExist( - TableSchema mergedSchema, List<String> partitionKeys) { + private void verifyPartitioningColumnsExist(Schema mergedSchema, List<String> partitionKeys) { + Set<String> columnNames = + mergedSchema.getColumns().stream() + .map(Schema.UnresolvedColumn::getName) + .collect(Collectors.toSet()); for (String partitionKey : partitionKeys) { - if (!mergedSchema.getTableColumn(partitionKey).isPresent()) { + if (!columnNames.contains(partitionKey)) { throw new ValidationException( String.format( "Partition column '%s' not defined in the table schema. Available columns: [%s]", partitionKey, - Arrays.stream(mergedSchema.getFieldNames()) + columnNames.stream() .collect(Collectors.joining("', '", "'", "'")))); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java index e64f970db97..7c5399d864d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java @@ -29,14 +29,15 @@ import org.apache.flink.sql.parser.ddl.constraint.SqlConstraintEnforcement; import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; import org.apache.flink.sql.parser.ddl.constraint.SqlUniqueSpec; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.FlinkTypeSystem; import org.apache.flink.table.planner.utils.PlannerMocks; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.utils.DataTypeFactoryMock; import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.sql.SqlBasicCall; @@ -69,14 +70,16 @@ public class MergeTableLikeUtilTest { Thread.currentThread().getContextClassLoader(), FlinkTypeSystem.INSTANCE); private final SqlValidator sqlValidator = PlannerMocks.create().getPlanner().getOrCreateSqlValidator(); - private final MergeTableLikeUtil util = new MergeTableLikeUtil(sqlValidator, SqlNode::toString); + private final DataTypeFactory dataTypeFactory = new DataTypeFactoryMock(); + private final MergeTableLikeUtil util = + new MergeTableLikeUtil(sqlValidator, SqlNode::toString, dataTypeFactory); @Test public void mergePhysicalColumns() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.physical("two", DataTypes.STRING())) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .column("two", DataTypes.STRING()) .build(); List<SqlNode> derivedColumns = @@ -84,7 +87,7 @@ public class MergeTableLikeUtilTest { regularColumn("three", DataTypes.INT()), regularColumn("four", DataTypes.STRING())); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( getDefaultMergingStrategies(), sourceSchema, @@ -92,12 +95,12 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.physical("two", DataTypes.STRING())) - .add(TableColumn.physical("three", DataTypes.INT())) - .add(TableColumn.physical("four", DataTypes.STRING())) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .column("two", DataTypes.STRING()) + .column("three", DataTypes.INT()) + .column("four", DataTypes.STRING()) .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -105,8 +108,7 @@ public class MergeTableLikeUtilTest { @Test public void mergeWithIncludeFailsOnDuplicateColumn() { - TableSchema sourceSchema = - TableSchema.builder().add(TableColumn.physical("one", DataTypes.INT())).build(); + Schema sourceSchema = Schema.newBuilder().column("one", DataTypes.INT()).build(); List<SqlNode> derivedColumns = Arrays.asList( @@ -127,8 +129,7 @@ public class MergeTableLikeUtilTest { @Test public void mergeWithIncludeFailsOnDuplicateRegularColumn() { - TableSchema sourceSchema = - TableSchema.builder().add(TableColumn.physical("one", DataTypes.INT())).build(); + Schema sourceSchema = Schema.newBuilder().column("one", DataTypes.INT()).build(); List<SqlNode> derivedColumns = Arrays.asList( @@ -150,8 +151,7 @@ public class MergeTableLikeUtilTest { @Test public void mergeWithIncludeFailsOnDuplicateRegularColumnAndComputeColumn() { - TableSchema sourceSchema = - TableSchema.builder().add(TableColumn.physical("one", DataTypes.INT())).build(); + Schema sourceSchema = Schema.newBuilder().column("one", DataTypes.INT()).build(); List<SqlNode> derivedColumns = Arrays.asList( @@ -176,8 +176,7 @@ public class MergeTableLikeUtilTest { @Test public void mergeWithIncludeFailsOnDuplicateRegularColumnAndMetadataColumn() { - TableSchema sourceSchema = - TableSchema.builder().add(TableColumn.physical("one", DataTypes.INT())).build(); + Schema sourceSchema = Schema.newBuilder().column("one", DataTypes.INT()).build(); List<SqlNode> derivedColumns = Arrays.asList( @@ -202,10 +201,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeGeneratedColumns() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "one + 1")) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "one + 1") .build(); List<SqlNode> derivedColumns = @@ -213,7 +212,7 @@ public class MergeTableLikeUtilTest { regularColumn("three", DataTypes.INT()), computedColumn("four", plus("one", "3"))); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( getDefaultMergingStrategies(), sourceSchema, @@ -221,12 +220,12 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "one + 1")) - .add(TableColumn.physical("three", DataTypes.INT())) - .add(TableColumn.computed("four", DataTypes.INT(), "`one` + 3")) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "one + 1") + .column("three", DataTypes.INT()) + .columnByExpression("four", "`one` + 3") .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -234,11 +233,11 @@ public class MergeTableLikeUtilTest { @Test public void mergeMetadataColumns() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.metadata("two", DataTypes.INT(), false)) - .add(TableColumn.computed("c", DataTypes.INT(), "ABS(two)")) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByMetadata("two", DataTypes.INT(), false) + .columnByExpression("c", "ABS(two)") .build(); List<SqlNode> derivedColumns = @@ -246,7 +245,7 @@ public class MergeTableLikeUtilTest { regularColumn("three", DataTypes.INT()), metadataColumn("four", DataTypes.INT(), true)); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( getDefaultMergingStrategies(), sourceSchema, @@ -254,13 +253,13 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.metadata("two", DataTypes.INT(), false)) - .add(TableColumn.computed("c", DataTypes.INT(), "ABS(two)")) - .add(TableColumn.physical("three", DataTypes.INT())) - .add(TableColumn.metadata("four", DataTypes.INT(), true)) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByMetadata("two", DataTypes.INT(), false) + .columnByExpression("c", "ABS(two)") + .column("three", DataTypes.INT()) + .columnByMetadata("four", DataTypes.INT(), true) .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -268,10 +267,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeIncludingGeneratedColumnsFailsOnDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "one + 1")) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "one + 1") .build(); List<SqlNode> derivedColumns = @@ -294,10 +293,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeIncludingMetadataColumnsFailsOnDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.metadata("two", DataTypes.INT())) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByMetadata("two", DataTypes.INT()) .build(); List<SqlNode> derivedColumns = @@ -320,10 +319,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeExcludingGeneratedColumnsDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "one + 1")) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "one + 1") .build(); List<SqlNode> derivedColumns = @@ -332,7 +331,7 @@ public class MergeTableLikeUtilTest { Map<FeatureOption, MergingStrategy> mergingStrategies = getDefaultMergingStrategies(); mergingStrategies.put(FeatureOption.GENERATED, MergingStrategy.EXCLUDING); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( mergingStrategies, sourceSchema, @@ -340,10 +339,10 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "`one` + 3")) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "`one` + 3") .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -351,10 +350,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeExcludingMetadataColumnsDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.metadata("two", DataTypes.INT())) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByMetadata("two", DataTypes.INT()) .build(); List<SqlNode> derivedColumns = @@ -363,7 +362,7 @@ public class MergeTableLikeUtilTest { Map<FeatureOption, MergingStrategy> mergingStrategies = getDefaultMergingStrategies(); mergingStrategies.put(FeatureOption.METADATA, MergingStrategy.EXCLUDING); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( mergingStrategies, sourceSchema, @@ -371,10 +370,10 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.metadata("two", DataTypes.BOOLEAN())) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByMetadata("two", DataTypes.BOOLEAN()) .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -382,10 +381,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeOverwritingGeneratedColumnsDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "one + 1")) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "one + 1") .build(); List<SqlNode> derivedColumns = @@ -394,7 +393,7 @@ public class MergeTableLikeUtilTest { Map<FeatureOption, MergingStrategy> mergingStrategies = getDefaultMergingStrategies(); mergingStrategies.put(FeatureOption.GENERATED, MergingStrategy.OVERWRITING); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( mergingStrategies, sourceSchema, @@ -402,10 +401,10 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "`one` + 3")) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "`one` + 3") .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -413,10 +412,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeOverwritingMetadataColumnsDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.metadata("two", DataTypes.INT())) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByMetadata("two", DataTypes.INT()) .build(); List<SqlNode> derivedColumns = @@ -425,7 +424,7 @@ public class MergeTableLikeUtilTest { Map<FeatureOption, MergingStrategy> mergingStrategies = getDefaultMergingStrategies(); mergingStrategies.put(FeatureOption.METADATA, MergingStrategy.OVERWRITING); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( mergingStrategies, sourceSchema, @@ -433,10 +432,10 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.metadata("two", DataTypes.BOOLEAN(), true)) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByMetadata("two", DataTypes.BOOLEAN(), true) .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -444,10 +443,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeOverwritingPhysicalColumnWithGeneratedColumn() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.physical("two", DataTypes.INT())) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .column("two", DataTypes.INT()) .build(); List<SqlNode> derivedColumns = @@ -473,10 +472,10 @@ public class MergeTableLikeUtilTest { @Test public void mergeOverwritingComputedColumnWithMetadataColumn() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "one + 3")) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "one + 3") .build(); List<SqlNode> derivedColumns = @@ -501,15 +500,12 @@ public class MergeTableLikeUtilTest { @Test public void mergeWatermarks() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "one + 1")) - .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP())) - .watermark( - "timestamp", - "timestamp - INTERVAL '5' SECOND", - DataTypes.TIMESTAMP()) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "one +1") + .column("timestamp", DataTypes.TIMESTAMP()) + .watermark("timestamp", "timestamp - INTERVAL '5' SECOND") .build(); List<SqlNode> derivedColumns = @@ -517,7 +513,7 @@ public class MergeTableLikeUtilTest { regularColumn("three", DataTypes.INT()), computedColumn("four", plus("one", "3"))); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( getDefaultMergingStrategies(), sourceSchema, @@ -525,17 +521,14 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.computed("two", DataTypes.INT(), "one + 1")) - .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP())) - .watermark( - "timestamp", - "timestamp - INTERVAL '5' SECOND", - DataTypes.TIMESTAMP()) - .add(TableColumn.physical("three", DataTypes.INT())) - .add(TableColumn.computed("four", DataTypes.INT(), "`one` + 3")) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .columnByExpression("two", "one +1") + .column("timestamp", DataTypes.TIMESTAMP()) + .watermark("timestamp", "timestamp - INTERVAL '5' SECOND") + .column("three", DataTypes.INT()) + .columnByExpression("four", "`one` + 3") .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -543,14 +536,11 @@ public class MergeTableLikeUtilTest { @Test public void mergeIncludingWatermarksFailsOnDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP())) - .watermark( - "timestamp", - "timestamp - INTERVAL '5' SECOND", - DataTypes.TIMESTAMP()) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .column("timestamp", DataTypes.TIMESTAMP()) + .watermark("timestamp", "timestamp - INTERVAL '5' SECOND") .build(); List<SqlWatermark> derivedWatermarkSpecs = @@ -577,14 +567,11 @@ public class MergeTableLikeUtilTest { @Test public void mergeExcludingWatermarksDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP())) - .watermark( - "timestamp", - "timestamp - INTERVAL '5' SECOND", - DataTypes.TIMESTAMP()) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .column("timestamp", DataTypes.TIMESTAMP()) + .watermark("timestamp", "timestamp - INTERVAL '5' SECOND") .build(); List<SqlWatermark> derivedWatermarkSpecs = @@ -597,7 +584,7 @@ public class MergeTableLikeUtilTest { Map<FeatureOption, MergingStrategy> mergingStrategies = getDefaultMergingStrategies(); mergingStrategies.put(FeatureOption.WATERMARKS, MergingStrategy.EXCLUDING); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( mergingStrategies, sourceSchema, @@ -605,14 +592,11 @@ public class MergeTableLikeUtilTest { derivedWatermarkSpecs, null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP())) - .watermark( - "timestamp", - "`timestamp` - INTERVAL '10' SECOND", - DataTypes.TIMESTAMP()) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .column("timestamp", DataTypes.TIMESTAMP()) + .watermark("timestamp", "`timestamp` - INTERVAL '10' SECOND") .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -620,14 +604,11 @@ public class MergeTableLikeUtilTest { @Test public void mergeOverwritingWatermarksDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP())) - .watermark( - "timestamp", - "timestamp - INTERVAL '5' SECOND", - DataTypes.TIMESTAMP()) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .column("timestamp", DataTypes.TIMESTAMP()) + .watermark("timestamp", "timestamp - INTERVAL '5' SECOND") .build(); List<SqlWatermark> derivedWatermarkSpecs = @@ -640,7 +621,7 @@ public class MergeTableLikeUtilTest { Map<FeatureOption, MergingStrategy> mergingStrategies = getDefaultMergingStrategies(); mergingStrategies.put(FeatureOption.WATERMARKS, MergingStrategy.OVERWRITING); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( mergingStrategies, sourceSchema, @@ -648,14 +629,11 @@ public class MergeTableLikeUtilTest { derivedWatermarkSpecs, null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT())) - .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP())) - .watermark( - "timestamp", - "`timestamp` - INTERVAL '10' SECOND", - DataTypes.TIMESTAMP()) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT()) + .column("timestamp", DataTypes.TIMESTAMP()) + .watermark("timestamp", "`timestamp` - INTERVAL '10' SECOND") .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -663,15 +641,15 @@ public class MergeTableLikeUtilTest { @Test public void mergeConstraintsFromBaseTable() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT().notNull())) - .add(TableColumn.physical("two", DataTypes.STRING().notNull())) - .add(TableColumn.physical("three", DataTypes.FLOAT())) - .primaryKey("constraint-42", new String[] {"one", "two"}) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT().notNull()) + .column("two", DataTypes.STRING().notNull()) + .column("three", DataTypes.FLOAT()) + .primaryKeyNamed("constraint-42", new String[] {"one", "two"}) .build(); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( getDefaultMergingStrategies(), sourceSchema, @@ -679,12 +657,12 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), null); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT().notNull())) - .add(TableColumn.physical("two", DataTypes.STRING().notNull())) - .add(TableColumn.physical("three", DataTypes.FLOAT())) - .primaryKey("constraint-42", new String[] {"one", "two"}) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT().notNull()) + .column("two", DataTypes.STRING().notNull()) + .column("three", DataTypes.FLOAT()) + .primaryKeyNamed("constraint-42", new String[] {"one", "two"}) .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -692,14 +670,14 @@ public class MergeTableLikeUtilTest { @Test public void mergeConstraintsFromDerivedTable() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT().notNull())) - .add(TableColumn.physical("two", DataTypes.STRING().notNull())) - .add(TableColumn.physical("three", DataTypes.FLOAT())) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT().notNull()) + .column("two", DataTypes.STRING().notNull()) + .column("three", DataTypes.FLOAT()) .build(); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( getDefaultMergingStrategies(), sourceSchema, @@ -707,12 +685,12 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), primaryKey("one", "two")); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT().notNull())) - .add(TableColumn.physical("two", DataTypes.STRING().notNull())) - .add(TableColumn.physical("three", DataTypes.FLOAT())) - .primaryKey("PK_3531879", new String[] {"one", "two"}) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT().notNull()) + .column("two", DataTypes.STRING().notNull()) + .column("three", DataTypes.FLOAT()) + .primaryKeyNamed("PK_3531879", new String[] {"one", "two"}) .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); @@ -720,12 +698,12 @@ public class MergeTableLikeUtilTest { @Test public void mergeIncludingConstraintsFailsOnDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT().notNull())) - .add(TableColumn.physical("two", DataTypes.STRING().notNull())) - .add(TableColumn.physical("three", DataTypes.FLOAT())) - .primaryKey("constraint-42", new String[] {"one", "two"}) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT().notNull()) + .column("two", DataTypes.STRING().notNull()) + .column("three", DataTypes.FLOAT()) + .primaryKeyNamed("constraint-42", new String[] {"one", "two"}) .build(); assertThatThrownBy( @@ -744,18 +722,18 @@ public class MergeTableLikeUtilTest { @Test public void mergeExcludingConstraintsOnDuplicate() { - TableSchema sourceSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT().notNull())) - .add(TableColumn.physical("two", DataTypes.STRING().notNull())) - .add(TableColumn.physical("three", DataTypes.FLOAT().notNull())) - .primaryKey("constraint-42", new String[] {"one", "two", "three"}) + Schema sourceSchema = + Schema.newBuilder() + .column("one", DataTypes.INT().notNull()) + .column("two", DataTypes.STRING().notNull()) + .column("three", DataTypes.FLOAT()) + .primaryKeyNamed("constraint-42", new String[] {"one", "two", "three"}) .build(); Map<FeatureOption, MergingStrategy> mergingStrategies = getDefaultMergingStrategies(); mergingStrategies.put(FeatureOption.CONSTRAINTS, MergingStrategy.EXCLUDING); - TableSchema mergedSchema = + Schema mergedSchema = util.mergeTables( mergingStrategies, sourceSchema, @@ -763,12 +741,12 @@ public class MergeTableLikeUtilTest { Collections.emptyList(), primaryKey("one", "two")); - TableSchema expectedSchema = - TableSchema.builder() - .add(TableColumn.physical("one", DataTypes.INT().notNull())) - .add(TableColumn.physical("two", DataTypes.STRING().notNull())) - .add(TableColumn.physical("three", DataTypes.FLOAT().notNull())) - .primaryKey("PK_3531879", new String[] {"one", "two"}) + Schema expectedSchema = + Schema.newBuilder() + .column("one", DataTypes.INT().notNull()) + .column("two", DataTypes.STRING().notNull()) + .column("three", DataTypes.FLOAT()) + .primaryKeyNamed("PK_3531879", new String[] {"one", "two"}) .build(); assertThat(mergedSchema).isEqualTo(expectedSchema); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index 8786641ec6f..d228c206aa5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -740,8 +740,10 @@ class TableScanTest extends TableTestBase { def testInvalidWatermarkOutputType(): Unit = { thrown.expect(classOf[ValidationException]) thrown.expectMessage( - "Watermark strategy '' must be of type TIMESTAMP or TIMESTAMP_LTZ" + - " but is of type 'CHAR(0) NOT NULL'.") + "Invalid data type of expression for watermark definition. " + + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), " + + "the supported precision 'p' is from 0 to 3, but the watermark " + + "expression type is CHAR(0) NOT NULL") util.addTable(""" |CREATE TABLE src ( | ts TIMESTAMP(3),
