CheneyYin opened a new issue, #6277: URL: https://github.com/apache/seatunnel/issues/6277
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened related: - #4345 The Flink engine cannot work properly when the seatunnel data type is `BasicType.VOID_TYPE`. I encountered this problem like #4345. I don't know much about flink either. I tried to find the reason. This should be caused by an internal defect in flink. The following are details of the direct cause of the problem. ```java public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo) { OutputConversionModifyOperation modifyOperation = new OutputConversionModifyOperation( table.getQueryOperation(), TypeConversions.fromLegacyInfoToDataType(typeInfo), OutputConversionModifyOperation.UpdateMode.APPEND); return toStreamInternal(table, modifyOperation); } ``` https://github.com/apache/flink/blob/e78e3cd29aa3f0dc14eeaa9b5a5d912efcbea863/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L326-L333 `BasicTypeInfo.VOID_TYPE_INTO` was converted to one `LegacyTypeInformation`, and typeRoot of the`LegacyTypeInformation` is `LogicalTypeRoot.RAW`. Finally, DynamicSinkUtils called `supportsImplicitCast` function, and this function will return false because of `LogicalTypeRoot.RAW`. Flink took that **the two raw types are not equal**. ```java } else if (sourceRoot == RAW && !targetType.is(BINARY_STRING) && !targetType.is(CHARACTER_STRING) || targetRoot == RAW) { // the two raw types are not equal (from initial invariant), casting is not possible return false; } else if (sourceRoot == SYMBOL || targetRoot == SYMBOL) { ``` https://github.com/apache/flink/blob/c41c8e5cfab683da8135d6c822693ef851d6e2b7/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L293-L355 ### SeaTunnel Version dev ### SeaTunnel Config ```conf env { parallelism = 1 job.mode = BATCH # checkpoint.interval = 10000 } source { FakeSource { row.num = 1 schema = { fields { c_null = "null" c_string = string c_boolean = boolean } } rows = [ { kind = INSERT fields = [ null, "AAA", false ] } ] result_table_name = "fake" } } sink{ Assert { source_table_name = "fake" rules = { row_rules = [ { rule_type = MAX_ROW rule_value = 1 }, { rule_type = MIN_ROW rule_value = 1 } ], field_rules = [ { field_name = c_string field_type = string field_value = [ { rule_type = NOT_NULL equals_to = "AAA" } ] }, { field_name = c_boolean field_type = boolean field_value = [ { rule_type = NOT_NULL equals_to = false } ] } ] } } } ``` ### Running Command ```shell no ``` ### Error Exception ```log 024-01-24 21:21:56,107 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Flink job executed failed at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:63) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample.main(SeaTunnelApiExample.java:39) Caused by: org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'null' do not match. Cause: Incompatible types for sink column 'c_null' at position 0. Query schema: [c_null: RAW('java.lang.Void', '...'), c_string: STRING, c_boolean: BOOLEAN, c_tinyint: TINYINT, c_smallint: SMALLINT, c_int: INT, c_bigint: BIGINT, c_float: FLOAT, c_double: DOUBLE, c_decimal: STRING, c_date: DATE, c_timestamp: TIMESTAMP(9), c_time: TIME(0), c_bytes: BYTES, c_array: ARRAY<INT>, c_map: MAP<STRING, STRING>, c_map_nest: MAP<STRING, ROW<`c_int` INT, `c_string` STRING>>, c_row: ROW<`c_null` RAW('java.lang.Void', '...'), `c_string` STRING, `c_boolean` BOOLEAN, `c_tinyint` TINYINT, `c_smallint` SMALLINT, `c_int` INT, `c_bigint` BIGINT, `c_float` FLOAT, `c_double` DOUBLE, `c_decimal` STRING, `c_date` DATE, `c_timestamp` TIMESTAMP(9), `c_time` TIME(0), `c_bytes` BYTES, `c_array` ARRAY<INT>, `c_map` MAP<STRING, STRING>>] Sink schema: [c_null: RAW('java.lang.Void', ?), c_string: STRING, c_boolean: BOOLEAN, c_tinyint: TINYINT, c_smallint: SMALLINT, c_int: INT, c_bigint: BIGINT, c_float: FLOAT, c_double: DOUBLE, c_decimal: STRING, c_date: DATE, c_timestamp: TIMESTAMP(3), c_time: TIME(0), c_bytes: BYTES, c_array: ARRAY<INT>, c_map: MAP<STRING, STRING>, c_map_nest: MAP<STRING, ROW<`c_int` INT, `c_string` STRING>>, c_row: ROW<`c_null` RAW('java.lang.Void', ?), `c_string` STRING, `c_boolean` BOOLEAN, `c_tinyint` TINYINT, `c_smallint` SMALLINT, `c_int` INT, `c_bigint` BIGINT, `c_float` FLOAT, `c_double` DOUBLE, `c_decimal` STRING, `c_date` DATE, `c_timestamp` TIMESTAMP(3), `c_time` TIME(0), `c_bytes` BYTES, `c_array` ARRAY<INT>, `c_map` MAP<STRING, STRING>>] at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:453) at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:265) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:185) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:332) at org.apache.seatunnel.core.starter.flink.utils.TableUtil.tableToDataStream(TableUtil.java:38) at org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor.fromSourceTable(FlinkAbstractPluginExecuteProcessor.java:103) at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:96) at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:116) at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:61) ... 2 more ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
