[ https://issues.apache.org/jira/browse/FLINK-25014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timo Walther closed FLINK-25014. -------------------------------- Fix Version/s: 1.15.0 Resolution: Fixed Fixed in master: 57a6f02357fd2eb7e3c59e9903f9ec33a655ead6 > Table to DataStream conversion, wrong field order > ------------------------------------------------- > > Key: FLINK-25014 > URL: https://issues.apache.org/jira/browse/FLINK-25014 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.14.0 > Reporter: Oliver Moser > Assignee: Timo Walther > Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > It seems that in some cases, the field reordering as describe in the relevant > [part of the > docs|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream] > does not seem to work properly. Given the following DDL > {code:java} > create table if not exists masterdata > ( > facility text, > shortcode text, > sks text, > sksnumber integer, > rdspp text, > manufacturer text, > facilitytype text, > controls text, > serial integer, > powerkw double precision, > hubheight double precision, > rotorheight integer, > latitude double precision, > longitude double precision, > elevation double precision > ); {code} > which should map to this POJO: > {code:java} > public static class MasterData { > public String controls; > public Double elevation; > public String facility; > public String facilityType; > public Double hubHeight; > public Double latitude; > public Double longitude; > public String manufacturer; > public Double powerKw; > public String rdsPp; > public Long rotorHeight; > public Long serial; > public String shortcode; > public String sks; > public Long sksNumber; > } {code} > I register the database using JdbcCatalog like this: > > {code:java} > JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, > password, baseUrl); > tableEnv.registerCatalog("cat", catalog); > tableEnv.useCatalog("cat"); {code} > and if I try to create a table with either "SELECT * FROM masterdata" or via > {code:java} > tableEnv.from("masterdata"); {code} > It will bail out with an exception similar to > {code:java} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Column types of query result and sink for registered table > 'cat.postgres.Unregistered_DataStream_Sink_1' do not match. > Cause: Incompatible types for sink column 'elevation' at position 1.Query > schema: [facility: STRING, shortcode: STRING, sks: STRING, sksnumber: INT, > rdspp: STRING, manufacturer: STRING, facilitytype: STRING, controls: STRING, > serial: INT, powerkw: DOUBLE, hubheight: DOUBLE, rotorheight: INT, latitude: > DOUBLE, longitude: DOUBLE, elevation: DOUBLE] > Sink schema: [controls: STRING, elevation: DOUBLE, facility: STRING, > facilityType: STRING, hubHeight: DOUBLE, latitude: DOUBLE, longitude: DOUBLE, > manufacturer: STRING, powerKw: DOUBLE, rdsPp: STRING, rotorHeight: BIGINT, > serial: BIGINT, shortcode: STRING, sks: STRING, sksNumber: BIGINT] {code} > If i explicitly set the order of the columns in the SELECT like this: > {code:java} > tableEnv.sqlQuery("SELECT > elevation,facility,latitude,longitude,manufacturer,serial from > masterdata");{code} > it works. In the debugger I can see that "queryFields" and "sinkField" in the > call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast > () are not aligned, i.e. the order of the fields in those two lists are not > the same, hence the exception. > Here is the full code for reproducing: > {code:java} > import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.junit.Test; > public class FieldReorderTest { > @Test > public void testFieldReordering() throws Exception { > String name = "cat"; > String defaultDatabase = "postgres"; > String username = "cat"; > String password = "1234"; > String baseUrl = > "jdbc:postgresql://cat.postgres.database.azure.com:5432"; > var env = StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, > password, baseUrl); > tableEnv.registerCatalog("cat", catalog); > tableEnv.useCatalog("cat"); > var table = tableEnv.from("masterdata"); > table.printSchema(); > tableEnv.toDataStream(table, MasterData.class).print(); > env.execute(); > // this works > // tableEnv.sqlQuery("SELECT > controls,elevation,facility,facilitytype,hubheight,latitude,longitude," + > // > "manufacturer,powerkw,rdspp,rotorheight,serial,shortcode,sks,sksnumber from > masterdata"); > } > public static class MasterData { > public String controls; > public Double elevation; > public String facility; > public String facilityType; > public Double hubHeight; > public Double latitude; > public Double longitude; > public String manufacturer; > public Double powerKw; > public String rdsPp; > public Long rotorHeight; > public Long serial; > public String shortcode; > public String sks; > public Long sksNumber; > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001)