[
https://issues.apache.org/jira/browse/FLINK-25014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454025#comment-17454025
]
Timo Walther commented on FLINK-25014:
--------------------------------------
The issue here is that we perform a case-sensitive
{{targetFieldNames.containsAll(inputFieldNames)}} which fails with
{{facilityType}} != {{facilitytype}}. Maybe we can make this more lenient.
> Table to DataStream conversion, wrong field order
> -------------------------------------------------
>
> Key: FLINK-25014
> URL: https://issues.apache.org/jira/browse/FLINK-25014
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.14.0
> Reporter: Oliver Moser
> Priority: Minor
>
> It seems that in some cases, the field reordering as describe in the relevant
> [part of the
> docs|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream]
> does not seem to work properly. Given the following DDL
> {code:java}
> create table if not exists masterdata
> (
> facility text,
> shortcode text,
> sks text,
> sksnumber integer,
> rdspp text,
> manufacturer text,
> facilitytype text,
> controls text,
> serial integer,
> powerkw double precision,
> hubheight double precision,
> rotorheight integer,
> latitude double precision,
> longitude double precision,
> elevation double precision
> ); {code}
> which should map to this POJO:
> {code:java}
> public static class MasterData {
> public String controls;
> public Double elevation;
> public String facility;
> public String facilityType;
> public Double hubHeight;
> public Double latitude;
> public Double longitude;
> public String manufacturer;
> public Double powerKw;
> public String rdsPp;
> public Long rotorHeight;
> public Long serial;
> public String shortcode;
> public String sks;
> public Long sksNumber;
> } {code}
> I register the database using JdbcCatalog like this:
>
> {code:java}
> JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username,
> password, baseUrl);
> tableEnv.registerCatalog("cat", catalog);
> tableEnv.useCatalog("cat"); {code}
> and if I try to create a table with either "SELECT * FROM masterdata" or via
> {code:java}
> tableEnv.from("masterdata"); {code}
> It will bail out with an exception similar to
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Column types of query result and sink for registered table
> 'cat.postgres.Unregistered_DataStream_Sink_1' do not match.
> Cause: Incompatible types for sink column 'elevation' at position 1.Query
> schema: [facility: STRING, shortcode: STRING, sks: STRING, sksnumber: INT,
> rdspp: STRING, manufacturer: STRING, facilitytype: STRING, controls: STRING,
> serial: INT, powerkw: DOUBLE, hubheight: DOUBLE, rotorheight: INT, latitude:
> DOUBLE, longitude: DOUBLE, elevation: DOUBLE]
> Sink schema: [controls: STRING, elevation: DOUBLE, facility: STRING,
> facilityType: STRING, hubHeight: DOUBLE, latitude: DOUBLE, longitude: DOUBLE,
> manufacturer: STRING, powerKw: DOUBLE, rdsPp: STRING, rotorHeight: BIGINT,
> serial: BIGINT, shortcode: STRING, sks: STRING, sksNumber: BIGINT] {code}
> If i explicitly set the order of the columns in the SELECT like this:
> {code:java}
> tableEnv.sqlQuery("SELECT
> elevation,facility,latitude,longitude,manufacturer,serial from
> masterdata");{code}
> it works. In the debugger I can see that "queryFields" and "sinkField" in the
> call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast
> () are not aligned, i.e. the order of the fields in those two lists are not
> the same, hence the exception.
> Here is the full code for reproducing:
> {code:java}
> import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.junit.Test;
> public class FieldReorderTest {
> @Test
> public void testFieldReordering() throws Exception {
> String name = "cat";
> String defaultDatabase = "postgres";
> String username = "cat";
> String password = "1234";
> String baseUrl =
> "jdbc:postgresql://cat.postgres.database.azure.com:5432";
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username,
> password, baseUrl);
> tableEnv.registerCatalog("cat", catalog);
> tableEnv.useCatalog("cat");
> var table = tableEnv.from("masterdata");
> table.printSchema();
> tableEnv.toDataStream(table, MasterData.class).print();
> env.execute();
> // this works
> // tableEnv.sqlQuery("SELECT
> controls,elevation,facility,facilitytype,hubheight,latitude,longitude," +
> //
> "manufacturer,powerkw,rdspp,rotorheight,serial,shortcode,sks,sksnumber from
> masterdata");
> }
> public static class MasterData {
> public String controls;
> public Double elevation;
> public String facility;
> public String facilityType;
> public Double hubHeight;
> public Double latitude;
> public Double longitude;
> public String manufacturer;
> public Double powerKw;
> public String rdsPp;
> public Long rotorHeight;
> public Long serial;
> public String shortcode;
> public String sks;
> public Long sksNumber;
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)