roland wang created FLINK-15283: ----------------------------------- Summary: Scala version of TableSinkUtils has a problem when validating sinks. Key: FLINK-15283 URL: https://issues.apache.org/jira/browse/FLINK-15283 Project: Flink Issue Type: Bug Components: API / Scala Affects Versions: 1.9.0 Environment: All environments of flink 1.9.0 Reporter: roland wang
*1. Phenomenon* I created a kafka sink with the schema like : {code:java} [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] {code} When I tried to insert some data into this sink, an error occurs as follows: {code:java} Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [TEST_SINK] do not match. Query result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] {code} ** Now I have to keep the order of the query schema absolutely as the sink's schema, which causes a lot of trouble. *2. Cause* I checked the code and found this line : {code:java} // validate schema of source table and table sink val srcFieldTypes = query.getTableSchema.getFieldDataTypes val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes if (srcFieldTypes.length != sinkFieldTypes.length || srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => !PlannerTypeUtils.isInteroperable( fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF)) }) { ...{code} I sink when they try to compare the sink's schma to query's schema, the zip code goes wrong because they forget to sort both of the schema. I trully hope this bug could be fixed soon. Thanks for all your hard work. -- This message was sent by Atlassian Jira (v8.3.4#803005)