Oliver Moser created FLINK-25014:
------------------------------------

             Summary: Table to DataStream conversion, wrong field order
                 Key: FLINK-25014
                 URL: https://issues.apache.org/jira/browse/FLINK-25014
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.14.0
            Reporter: Oliver Moser


It seems that in some cases, the field reordering as describe in the relevant 
[part of the 
docs|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream]
 does not seem to work properly. Given the following DDL

 

 
{code:java}
create table if not exists masterdata
(
   facility text,
   shortcode text,
   sks text,
   sksnumber integer,
   rdspp text,
   manufacturer text,
   facilitytype text,
   controls text,
   serial integer,
   powerkw double precision,
   hubheight double precision,
   rotorheight integer,
   latitude double precision,
   longitude double precision,
   elevation double precision
); {code}
 

 

which should map to this POJO:

 

 
{code:java}
public static class MasterData {

   public String controls;

   public Double elevation;

   public String facility;

   public String facilityType;

   public Double hubHeight;

   public Double latitude;

   public Double longitude;

   public String manufacturer;

   public Double powerKw;

   public String rdsPp;

   public Long rotorHeight;

   public Long serial;

   public String shortcode;

   public String sks;

   public Long sksNumber;

} {code}
I register the database using JdbcCatalog like this:

 
{code:java}
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, 
password, baseUrl);   
tableEnv.registerCatalog("cat", catalog);   
tableEnv.useCatalog("cat"); {code}
and if I try to create a table with either "SELECT * FROM masterdata" or via

 
{code:java}
tableEnv.from("masterdata"); {code}
It will bail out with an exception similar to

 

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Column types of query result and sink for registered table 
'cat.postgres.Unregistered_DataStream_Sink_1' do not match.
Cause: Incompatible types for sink column 'elevation' at position 1.Query 
schema: [facility: STRING, shortcode: STRING, sks: STRING, sksnumber: INT, 
rdspp: STRING, manufacturer: STRING, facilitytype: STRING, controls: STRING, 
serial: INT, powerkw: DOUBLE, hubheight: DOUBLE, rotorheight: INT, latitude: 
DOUBLE, longitude: DOUBLE, elevation: DOUBLE]
Sink schema:  [controls: STRING, elevation: DOUBLE, facility: STRING, 
facilityType: STRING, hubHeight: DOUBLE, latitude: DOUBLE, longitude: DOUBLE, 
manufacturer: STRING, powerKw: DOUBLE, rdsPp: STRING, rotorHeight: BIGINT, 
serial: BIGINT, shortcode: STRING, sks: STRING, sksNumber: BIGINT] {code}
If i explicitly set the order of the columns in the SELECT like this:

   
{code:java}
tableEnv.sqlQuery("SELECT 
elevation,facility,latitude,longitude,manufacturer,serial from 
masterdata");{code}

it works. In the debugger I can see that "queryFields" and "sinkField" in the 
call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast
() are not aligned, i.e. the order of the fields in those two lists are not the 
same, hence the exception.

 

 

 

here is the full code for reproducing:

 

 
{code:java}
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.Test;

public class FieldReorderTest {

   @Test
   public void testFieldReordering() throws Exception {
      String name = "cat";
      String defaultDatabase = "postgres";
      String username = "cat";
      String password = "1234";
      String baseUrl = "jdbc:postgresql://cat.postgres.database.azure.com:5432";

      var env = StreamExecutionEnvironment.getExecutionEnvironment();
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

      JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, 
password, baseUrl);
      tableEnv.registerCatalog("cat", catalog);
      tableEnv.useCatalog("cat");

      var table = tableEnv.from("masterdata");
      table.printSchema();
      tableEnv.toDataStream(table, MasterData.class).print();
      env.execute();       

      // this works
      // tableEnv.sqlQuery("SELECT    
controls,elevation,facility,facilitytype,hubheight,latitude,longitude," +
      //       
"manufacturer,powerkw,rdspp,rotorheight,serial,shortcode,sks,sksnumber from 
masterdata");     
}

   public static class MasterData {

      public String controls;

      public Double elevation;

      public String facility;

      public String facilityType;

      public Double hubHeight;

      public Double latitude;

      public Double longitude;

      public String manufacturer;

      public Double powerKw;

      public String rdsPp;

      public Long rotorHeight;

      public Long serial;

      public String shortcode;

      public String sks;

      public Long sksNumber;

   }

}
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to