Oliver Moser created FLINK-25014:
------------------------------------
Summary: Table to DataStream conversion, wrong field order
Key: FLINK-25014
URL: https://issues.apache.org/jira/browse/FLINK-25014
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.14.0
Reporter: Oliver Moser
It seems that in some cases, the field reordering as describe in the relevant
[part of the
docs|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream]
does not seem to work properly. Given the following DDL
{code:java}
create table if not exists masterdata
(
facility text,
shortcode text,
sks text,
sksnumber integer,
rdspp text,
manufacturer text,
facilitytype text,
controls text,
serial integer,
powerkw double precision,
hubheight double precision,
rotorheight integer,
latitude double precision,
longitude double precision,
elevation double precision
); {code}
which should map to this POJO:
{code:java}
public static class MasterData {
public String controls;
public Double elevation;
public String facility;
public String facilityType;
public Double hubHeight;
public Double latitude;
public Double longitude;
public String manufacturer;
public Double powerKw;
public String rdsPp;
public Long rotorHeight;
public Long serial;
public String shortcode;
public String sks;
public Long sksNumber;
} {code}
I register the database using JdbcCatalog like this:
{code:java}
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username,
password, baseUrl);
tableEnv.registerCatalog("cat", catalog);
tableEnv.useCatalog("cat"); {code}
and if I try to create a table with either "SELECT * FROM masterdata" or via
{code:java}
tableEnv.from("masterdata"); {code}
It will bail out with an exception similar to
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Column types of query result and sink for registered table
'cat.postgres.Unregistered_DataStream_Sink_1' do not match.
Cause: Incompatible types for sink column 'elevation' at position 1.Query
schema: [facility: STRING, shortcode: STRING, sks: STRING, sksnumber: INT,
rdspp: STRING, manufacturer: STRING, facilitytype: STRING, controls: STRING,
serial: INT, powerkw: DOUBLE, hubheight: DOUBLE, rotorheight: INT, latitude:
DOUBLE, longitude: DOUBLE, elevation: DOUBLE]
Sink schema: [controls: STRING, elevation: DOUBLE, facility: STRING,
facilityType: STRING, hubHeight: DOUBLE, latitude: DOUBLE, longitude: DOUBLE,
manufacturer: STRING, powerKw: DOUBLE, rdsPp: STRING, rotorHeight: BIGINT,
serial: BIGINT, shortcode: STRING, sks: STRING, sksNumber: BIGINT] {code}
If i explicitly set the order of the columns in the SELECT like this:
{code:java}
tableEnv.sqlQuery("SELECT
elevation,facility,latitude,longitude,manufacturer,serial from
masterdata");{code}
it works. In the debugger I can see that "queryFields" and "sinkField" in the
call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast
() are not aligned, i.e. the order of the fields in those two lists are not the
same, hence the exception.
here is the full code for reproducing:
{code:java}
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.Test;
public class FieldReorderTest {
@Test
public void testFieldReordering() throws Exception {
String name = "cat";
String defaultDatabase = "postgres";
String username = "cat";
String password = "1234";
String baseUrl = "jdbc:postgresql://cat.postgres.database.azure.com:5432";
var env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username,
password, baseUrl);
tableEnv.registerCatalog("cat", catalog);
tableEnv.useCatalog("cat");
var table = tableEnv.from("masterdata");
table.printSchema();
tableEnv.toDataStream(table, MasterData.class).print();
env.execute();
// this works
// tableEnv.sqlQuery("SELECT
controls,elevation,facility,facilitytype,hubheight,latitude,longitude," +
//
"manufacturer,powerkw,rdspp,rotorheight,serial,shortcode,sks,sksnumber from
masterdata");
}
public static class MasterData {
public String controls;
public Double elevation;
public String facility;
public String facilityType;
public Double hubHeight;
public Double latitude;
public Double longitude;
public String manufacturer;
public Double powerKw;
public String rdsPp;
public Long rotorHeight;
public Long serial;
public String shortcode;
public String sks;
public Long sksNumber;
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)