CheneyYin opened a new issue, #5187: URL: https://github.com/apache/seatunnel/issues/5187
### Search before asking - [X] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description At present, SeaTunnel when used Spark engine can't process any data with Time type. It caused by that Spark SQL type system doesn't support Time type. The solution is as follows #### 1. Data Type Mapping I suggest mapping `LocalTimeType.LOCAL_TIME_TYPE` fields with `DataTypes.LongType` fields between SeaTunnel Internal and Spark. And adding flags (`{logical_time_type : True}`) to metadata of these Spark fields. These flags will help convertors to distinguish normal Long type fields from implicit TIME fields. For detailed mapping relationship, see the following two tables. | | SeaTunnelRow (SeaTunnel Internal) | InternalRow (Spark Internal) | | ---------------- | --------------------------------- | ----------------------------------------------------------- | | Field Data Type | `LocalTimeType.LOCAL_TIME_TYPE` | `DataTypes.LongType`(Metadata{ 'logical_time_type' : True}) | | Field Value Type | `java.time.LocalTime` | `java.lang.Long` | | | SeaTunnelRow (SeaTunnel Internal) | GenericRowWithSchema (Spark Internal) | | ---- | -------------- | -------------------- | |Field Data Type | `LocalTimeType.LOCAL_TIME_TYPE` | `DataTypes.LongType`(Metadata{ 'logical_time_type' : True}) | |Field Value Type | `java.time.LocalTime` | `java.lang.Long`| #### 2. Resolve Dataset Metadata erasure ```java // The example code refer org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor#sparkTransform ExpressionEncoder<Row> encoder = RowEncoder.apply(schema); Dataset<Row> newDataset = dataset.mapPartitions( (MapPartitionsFunction<Row, Row>) func, encoder ); ``` In the above example code, some fields in `schema` contain metadatas and `encoder` has the same schema like `schema` . But metadatas of `newDataset` is empty, `newDataset` object is inferred from `map/mapPartition` function. This problem will disable data type mapping solution. To solve this problem, result dataset should be reconciled to the specified schema. The function `org.apache.spark.sql.Dataset#to` provided in `spark v3.4.0` can solve this problem. For compatibility, I suggest to transplant this function and rewrite in Java. ### Usage Scenario _No response_ ### Related issues _No response_ ### Are you willing to submit a PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
