[ 
https://issues.apache.org/jira/browse/FLINK-25014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-25014.
--------------------------------
    Fix Version/s: 1.15.0
       Resolution: Fixed

Fixed in master: 57a6f02357fd2eb7e3c59e9903f9ec33a655ead6

> Table to DataStream conversion, wrong field order
> -------------------------------------------------
>
>                 Key: FLINK-25014
>                 URL: https://issues.apache.org/jira/browse/FLINK-25014
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.14.0
>            Reporter: Oliver Moser
>            Assignee: Timo Walther
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> It seems that in some cases, the field reordering as describe in the relevant 
> [part of the 
> docs|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream]
>  does not seem to work properly. Given the following DDL
> {code:java}
> create table if not exists masterdata
> (
>    facility text,
>    shortcode text,
>    sks text,
>    sksnumber integer,
>    rdspp text,
>    manufacturer text,
>    facilitytype text,
>    controls text,
>    serial integer,
>    powerkw double precision,
>    hubheight double precision,
>    rotorheight integer,
>    latitude double precision,
>    longitude double precision,
>    elevation double precision
> ); {code}
> which should map to this POJO:
> {code:java}
> public static class MasterData {
>    public String controls;
>    public Double elevation;
>    public String facility;
>    public String facilityType;
>    public Double hubHeight;
>    public Double latitude;
>    public Double longitude;
>    public String manufacturer;
>    public Double powerKw;
>    public String rdsPp;
>    public Long rotorHeight;
>    public Long serial;
>    public String shortcode;
>    public String sks;
>    public Long sksNumber;
> } {code}
> I register the database using JdbcCatalog like this:
>  
> {code:java}
> JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, 
> password, baseUrl);   
> tableEnv.registerCatalog("cat", catalog);   
> tableEnv.useCatalog("cat"); {code}
> and if I try to create a table with either "SELECT * FROM masterdata" or via
> {code:java}
> tableEnv.from("masterdata"); {code}
> It will bail out with an exception similar to
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Column types of query result and sink for registered table 
> 'cat.postgres.Unregistered_DataStream_Sink_1' do not match.
> Cause: Incompatible types for sink column 'elevation' at position 1.Query 
> schema: [facility: STRING, shortcode: STRING, sks: STRING, sksnumber: INT, 
> rdspp: STRING, manufacturer: STRING, facilitytype: STRING, controls: STRING, 
> serial: INT, powerkw: DOUBLE, hubheight: DOUBLE, rotorheight: INT, latitude: 
> DOUBLE, longitude: DOUBLE, elevation: DOUBLE]
> Sink schema:  [controls: STRING, elevation: DOUBLE, facility: STRING, 
> facilityType: STRING, hubHeight: DOUBLE, latitude: DOUBLE, longitude: DOUBLE, 
> manufacturer: STRING, powerKw: DOUBLE, rdsPp: STRING, rotorHeight: BIGINT, 
> serial: BIGINT, shortcode: STRING, sks: STRING, sksNumber: BIGINT] {code}
> If i explicitly set the order of the columns in the SELECT like this:
> {code:java}
> tableEnv.sqlQuery("SELECT 
> elevation,facility,latitude,longitude,manufacturer,serial from 
> masterdata");{code}
> it works. In the debugger I can see that "queryFields" and "sinkField" in the 
> call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast
> () are not aligned, i.e. the order of the fields in those two lists are not 
> the same, hence the exception.
> Here is the full code for reproducing:
> {code:java}
> import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.junit.Test;
> public class FieldReorderTest {
>    @Test
>    public void testFieldReordering() throws Exception {
>       String name = "cat";
>       String defaultDatabase = "postgres";
>       String username = "cat";
>       String password = "1234";
>       String baseUrl = 
> "jdbc:postgresql://cat.postgres.database.azure.com:5432";
>       var env = StreamExecutionEnvironment.getExecutionEnvironment();
>       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>       JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, 
> password, baseUrl);
>       tableEnv.registerCatalog("cat", catalog);
>       tableEnv.useCatalog("cat");
>       var table = tableEnv.from("masterdata");
>       table.printSchema();
>       tableEnv.toDataStream(table, MasterData.class).print();
>       env.execute();       
>       // this works
>       // tableEnv.sqlQuery("SELECT    
> controls,elevation,facility,facilitytype,hubheight,latitude,longitude," +
>       //       
> "manufacturer,powerkw,rdspp,rotorheight,serial,shortcode,sks,sksnumber from 
> masterdata");     
> }
>    public static class MasterData {
>       public String controls;
>       public Double elevation;
>       public String facility;
>       public String facilityType;
>       public Double hubHeight;
>       public Double latitude;
>       public Double longitude;
>       public String manufacturer;
>       public Double powerKw;
>       public String rdsPp;
>       public Long rotorHeight;
>       public Long serial;
>       public String shortcode;
>       public String sks;
>       public Long sksNumber;
>    }
> }
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to