CheneyYin opened a new issue, #5187:
URL: https://github.com/apache/seatunnel/issues/5187

   ### Search before asking
   
   - [X] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   At present, SeaTunnel when used Spark engine can't process any data with 
Time type. It caused by that Spark SQL type system doesn't support Time type. 
The solution is as follows
   
   #### 1. Data Type Mapping
   
   I suggest mapping `LocalTimeType.LOCAL_TIME_TYPE` fields with 
`DataTypes.LongType` fields between SeaTunnel Internal and Spark. And adding 
flags (`{logical_time_type : True}`) to metadata of these Spark fields. These 
flags will help convertors to distinguish normal Long type fields from implicit 
TIME fields. For detailed mapping relationship, see the following two tables.
   
   |                  | SeaTunnelRow (SeaTunnel Internal) | InternalRow (Spark 
Internal)                                |
   | ---------------- | --------------------------------- | 
----------------------------------------------------------- |
   | Field Data Type  | `LocalTimeType.LOCAL_TIME_TYPE`   | 
`DataTypes.LongType`(Metadata{ 'logical_time_type' : True}) |
   | Field Value Type | `java.time.LocalTime`             | `java.lang.Long`    
                                        |
   
   |      | SeaTunnelRow (SeaTunnel Internal) | GenericRowWithSchema (Spark 
Internal) |
   | ---- | -------------- | -------------------- |
   |Field Data Type | `LocalTimeType.LOCAL_TIME_TYPE` | 
`DataTypes.LongType`(Metadata{ 'logical_time_type' : True}) |
   |Field Value Type | `java.time.LocalTime` | `java.lang.Long`|
   
   #### 2. Resolve Dataset Metadata erasure
   
   ```java
   // The example code refer 
org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor#sparkTransform
   
   ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
    Dataset<Row> newDataset = dataset.mapPartitions(
        (MapPartitionsFunction<Row, Row>) func,
        encoder
    );
   ```
   
   In the above example code, some fields in `schema` contain metadatas and 
`encoder` has the same schema like `schema` . But metadatas of `newDataset` is 
empty, `newDataset` object is inferred from `map/mapPartition` function. This 
problem will disable data type mapping solution.
   
   To solve this problem, result dataset should be reconciled to the specified 
schema. The function `org.apache.spark.sql.Dataset#to` provided in `spark 
v3.4.0` can solve this problem. For compatibility, I suggest to transplant this 
function and rewrite in Java.
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to