CheneyYin opened a new issue, #6277:
URL: https://github.com/apache/seatunnel/issues/6277

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   related:
   - #4345 
   
   The Flink engine cannot work properly when the seatunnel data type is 
`BasicType.VOID_TYPE`. I encountered this problem like #4345. 
   
   I don't know much about flink either. I tried to find the reason. This 
should be caused by an internal defect in flink. The following are details of 
the direct cause of the problem.
   
   ```java
   public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> 
typeInfo) {
           OutputConversionModifyOperation modifyOperation =
                   new OutputConversionModifyOperation(
                           table.getQueryOperation(),
                           TypeConversions.fromLegacyInfoToDataType(typeInfo),
                           OutputConversionModifyOperation.UpdateMode.APPEND);
           return toStreamInternal(table, modifyOperation);
       }
   ```
   
https://github.com/apache/flink/blob/e78e3cd29aa3f0dc14eeaa9b5a5d912efcbea863/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java#L326-L333
   
   `BasicTypeInfo.VOID_TYPE_INTO` was converted to one `LegacyTypeInformation`, 
and typeRoot of the`LegacyTypeInformation` is `LogicalTypeRoot.RAW`.
   
   
   Finally, DynamicSinkUtils called `supportsImplicitCast` function, and this 
function will return false because of `LogicalTypeRoot.RAW`. Flink took that 
**the two raw types are not equal**.
   
   ```java
   } else if (sourceRoot == RAW
                           && !targetType.is(BINARY_STRING)
                           && !targetType.is(CHARACTER_STRING)
                   || targetRoot == RAW) {
               // the two raw types are not equal (from initial invariant), 
casting is not possible
               return false;
   } else if (sourceRoot == SYMBOL || targetRoot == SYMBOL) {
   ```
   
https://github.com/apache/flink/blob/c41c8e5cfab683da8135d6c822693ef851d6e2b7/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L293-L355
   
   ### SeaTunnel Version
   
   dev
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = BATCH
     # checkpoint.interval = 10000
   }
   
   source {
     FakeSource {
       row.num = 1
       schema = {
         fields {
           c_null = "null"
           c_string = string
           c_boolean = boolean
         }
       }
       rows = [
         {
           kind = INSERT
           fields = [
             null, "AAA", false
           ]
         }
       ]
       result_table_name = "fake"
     }
   }
   
   sink{
     Assert {
       source_table_name = "fake"
       rules =
         {
           row_rules = [
             {
               rule_type = MAX_ROW
               rule_value = 1
             },
             {
               rule_type = MIN_ROW
               rule_value = 1
             }
           ],
           field_rules = [
               {
                   field_name = c_string
                   field_type = string
                   field_value = [
                       {
                           rule_type = NOT_NULL
                           equals_to = "AAA"
                       }
                   ]
               },
               {
                   field_name = c_boolean
                   field_type = boolean
                   field_value = [
                       {
                           rule_type = NOT_NULL
                           equals_to = false
                       }
                   ]
               }
           ]
       }
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   no
   ```
   
   
   ### Error Exception
   
   ```log
   024-01-24 21:21:56,107 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
Exception 
StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: 
Flink job executed failed
        at 
org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:63)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample.main(SeaTunnelApiExample.java:39)
   Caused by: org.apache.flink.table.api.ValidationException: Column types of 
query result and sink for 'null' do not match.
   Cause: Incompatible types for sink column 'c_null' at position 0.
   
   Query schema: [c_null: RAW('java.lang.Void', '...'), c_string: STRING, 
c_boolean: BOOLEAN, c_tinyint: TINYINT, c_smallint: SMALLINT, c_int: INT, 
c_bigint: BIGINT, c_float: FLOAT, c_double: DOUBLE, c_decimal: STRING, c_date: 
DATE, c_timestamp: TIMESTAMP(9), c_time: TIME(0), c_bytes: BYTES, c_array: 
ARRAY<INT>, c_map: MAP<STRING, STRING>, c_map_nest: MAP<STRING, ROW<`c_int` 
INT, `c_string` STRING>>, c_row: ROW<`c_null` RAW('java.lang.Void', '...'), 
`c_string` STRING, `c_boolean` BOOLEAN, `c_tinyint` TINYINT, `c_smallint` 
SMALLINT, `c_int` INT, `c_bigint` BIGINT, `c_float` FLOAT, `c_double` DOUBLE, 
`c_decimal` STRING, `c_date` DATE, `c_timestamp` TIMESTAMP(9), `c_time` 
TIME(0), `c_bytes` BYTES, `c_array` ARRAY<INT>, `c_map` MAP<STRING, STRING>>]
   Sink schema:  [c_null: RAW('java.lang.Void', ?), c_string: STRING, 
c_boolean: BOOLEAN, c_tinyint: TINYINT, c_smallint: SMALLINT, c_int: INT, 
c_bigint: BIGINT, c_float: FLOAT, c_double: DOUBLE, c_decimal: STRING, c_date: 
DATE, c_timestamp: TIMESTAMP(3), c_time: TIME(0), c_bytes: BYTES, c_array: 
ARRAY<INT>, c_map: MAP<STRING, STRING>, c_map_nest: MAP<STRING, ROW<`c_int` 
INT, `c_string` STRING>>, c_row: ROW<`c_null` RAW('java.lang.Void', ?), 
`c_string` STRING, `c_boolean` BOOLEAN, `c_tinyint` TINYINT, `c_smallint` 
SMALLINT, `c_int` INT, `c_bigint` BIGINT, `c_float` FLOAT, `c_double` DOUBLE, 
`c_decimal` STRING, `c_date` DATE, `c_timestamp` TIMESTAMP(3), `c_time` 
TIME(0), `c_bytes` BYTES, `c_array` ARRAY<INT>, `c_map` MAP<STRING, STRING>>]
        at 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:453)
        at 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:265)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:287)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:185)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
        at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223)
        at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:332)
        at 
org.apache.seatunnel.core.starter.flink.utils.TableUtil.tableToDataStream(TableUtil.java:38)
        at 
org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor.fromSourceTable(FlinkAbstractPluginExecuteProcessor.java:103)
        at 
org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:96)
        at 
org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:116)
        at 
org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:61)
        ... 2 more
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to