[
https://issues.apache.org/jira/browse/FLINK-24359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martijn Visser reassigned FLINK-24359:
--------------------------------------
Assignee: Francesco Guardiani
> Migrate FileSystem connector to ResolvedSchema
> ----------------------------------------------
>
> Key: FLINK-24359
> URL: https://issues.apache.org/jira/browse/FLINK-24359
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Ecosystem
> Environment: Flink 1.14-SNAPSHOT
> Reporter: Francesco Guardiani
> Assignee: Francesco Guardiani
> Priority: Major
> Labels: pull-request-available
>
> Filesystem connector uses the TableSchema deprecated APIs. This causes issues
> with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema)
> requires the expressions to be serializable strings
> (ResolvedExpression#asSerializableString).
> For example:
> {code:java}
> TableDescriptor inputTable = TableDescriptor.forConnector("filesystem")
> .schema(
> Schema.newBuilder()
> .column("character", DataTypes.STRING())
> .column("latitude", DataTypes.STRING())
> .column("longitude", DataTypes.STRING())
> .column("time", DataTypes.TIMESTAMP(3))
> .watermark("time", $("time").minus(lit(2).seconds()))
> .build()
> )
> // Other options
> .build();
> {code}
> When used in a table pipeline, throws the following exception:
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time,
> 2000)' is not string serializable. Currently, only expressions that
> originated from a SQL expression have a well-defined string representation.
> at
> org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
> at
> org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
> at
> java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976)
> at
> org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
> at
> org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54)
> at
> org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52)
> at
> org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91)
> at
> org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145)
> {code}
> The same table definition using SQL works fine:
> {code:java}
> CREATE TABLE IF NOT EXISTS LocationEvents (
> `character` STRING,
> `latitude` STRING,
> `longitude` STRING,
> `time` TIMESTAMP(3),
> WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES
> ) WITH (
> -- Load from filesystem
> 'connector' = 'filesystem',
> --- Other configs
> );
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)