[ 
https://issues.apache.org/jira/browse/FLINK-24359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17419021#comment-17419021
 ] 

Martijn Visser commented on FLINK-24359:
----------------------------------------

[~slinkydeveloper] I'm not 100% sure but I believe the Table API uses the new 
File Source connector, but still uses the legacy File Sink. The Table API 
should only use the target File Source and Sink connector (based on the Source 
API and Unified Sink API)

> Migrate FileSystem connector to ResolvedSchema
> ----------------------------------------------
>
>                 Key: FLINK-24359
>                 URL: https://issues.apache.org/jira/browse/FLINK-24359
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Ecosystem
>         Environment: Flink 1.14-SNAPSHOT
>            Reporter: Francesco Guardiani
>            Priority: Major
>
> Filesystem connector uses the TableSchema deprecated APIs. This causes issues 
> with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) 
> requires the expressions to be serializable strings 
> (ResolvedExpression#asSerializableString).
> For example:
> {code:java}
> TableDescriptor inputTable = TableDescriptor.forConnector("filesystem")
>         .schema(
>                 Schema.newBuilder()
>                         .column("character", DataTypes.STRING())
>                         .column("latitude", DataTypes.STRING())
>                         .column("longitude", DataTypes.STRING())
>                         .column("time", DataTypes.TIMESTAMP(3))
>                         .watermark("time", $("time").minus(lit(2).seconds()))
>                         .build()
>         )
>         // Other options
>         .build();
> {code}
> When used in a table pipeline, throws the following exception:
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 
> 2000)' is not string serializable. Currently, only expressions that 
> originated from a SQL expression have a well-defined string representation.
>       at 
> org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
>       at 
> org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
>       at 
> java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976)
>       at 
> org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
>       at 
> org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54)
>       at 
> org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52)
>       at 
> org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91)
>       at 
> org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
>       at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145)
> {code}
> The same table definition using SQL works fine:
> {code:java}
> CREATE TABLE IF NOT EXISTS LocationEvents (
>     `character` STRING,
>     `latitude` STRING,
>     `longitude` STRING,
>     `time` TIMESTAMP(3),
>     WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES
> ) WITH (
>     -- Load from filesystem
>     'connector' = 'filesystem',
>     --- Other configs
> );
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to