Francesco Guardiani created FLINK-24359: -------------------------------------------
Summary: Migrate FileSystem connector to ResolvedSchema Key: FLINK-24359 URL: https://issues.apache.org/jira/browse/FLINK-24359 Project: Flink Issue Type: New Feature Components: Table SQL / Ecosystem Environment: Flink 1.14-SNAPSHOT Reporter: Francesco Guardiani Filesystem connector uses the TableSchema deprecated APIs. This causes issues with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) requires the expressions to be serializable strings (ResolvedExpression#asSerializableString). For example: {code:java} TableDescriptor inputTable = TableDescriptor.forConnector("filesystem") .schema( Schema.newBuilder() .column("character", DataTypes.STRING()) .column("latitude", DataTypes.STRING()) .column("longitude", DataTypes.STRING()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", $("time").minus(lit(2).seconds())) .build() ) // Other options .build(); {code} When used in a table pipeline, throws the following exception: {code:java} Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 2000)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation. at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976) at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) at org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54) at org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52) at org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91) at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145) {code} The same table definition using SQL works fine: {code:java} CREATE TABLE IF NOT EXISTS LocationEvents ( `character` STRING, `latitude` STRING, `longitude` STRING, `time` TIMESTAMP(3), WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES ) WITH ( -- Load from filesystem 'connector' = 'filesystem', --- Other configs ); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)